import uvicorn import nltk nltk.download('punkt') nltk.download('wordnet') nltk.download('omw-1.4') nltk.download('averaged_perceptron_tagger') nltk.download('punkt_tab') from nltk.stem import WordNetLemmatizer from nltk.corpus import wordnet from tqdm import tqdm from tqdm.keras import TqdmCallback import json import pickle import random import asyncio import concurrent.futures import multiprocessing import io import os import tempfile import numpy as np from tensorflow.keras import Sequential from tensorflow.keras.layers import Dense, Dropout, Input from tensorflow.keras.optimizers import SGD from tensorflow.keras.models import load_model, save_model import redis import os from dotenv import load_dotenv from fastapi import FastAPI from fastapi.responses import HTMLResponse from pydantic import BaseModel load_dotenv() app = FastAPI() lemmatizer = WordNetLemmatizer() redis_password = os.getenv("REDIS_PASSWORD") r = redis.Redis(host=os.getenv("REDIS_HOST"), port=int(os.getenv("REDIS_PORT")), password=redis_password) # Load existing data from files if they exist and upload to Redis def load_data_to_redis(): files_to_load = { 'intents.json': 'intents', 'classes.pkl': 'classes', 'words.pkl': 'words', 'chatbot_model.h5': 'chatbot_model' } for file_name, redis_key in files_to_load.items(): if os.path.exists(file_name) and not r.exists(redis_key): print(f"Loading {file_name} to Redis...") if file_name.endswith('.json'): with open(file_name) as f: data = json.load(f) r.set(redis_key, json.dumps(data)) elif file_name.endswith('.h5'): with open(file_name, 'rb') as f: r.set(redis_key, f.read()) else: with open(file_name, 'rb') as f: r.set(redis_key, pickle.dumps(pickle.load(f))) # Ensure directories exist if not os.path.exists('models'): os.makedirs('models') def initialize_redis(): global r try: r.ping() print("Redis connection successful.") load_data_to_redis() # Load data on successful connection except redis.exceptions.ConnectionError: print("Error connecting to Redis. Exiting.") exit(1) async def train_and_save_model(): global lemmatizer, r while True: words = [] classes = [] documents = [] ignore_words = ['?', '!'] intents = json.loads(r.get('intents')) print("Loading user questions from Redis...") if not r.exists('user_questions_loaded'): user_questions = r.lrange('user_questions', 0, -1) for question in user_questions: question = question.decode('utf-8') try: existing_tag = r.get(f"tag:{question}").decode('utf-8') documents.append((nltk.word_tokenize(question), existing_tag)) if existing_tag not in classes: classes.append(existing_tag) except AttributeError: documents.append((nltk.word_tokenize(question), "unknown")) if "unknown" not in classes: classes.append("unknown") r.set('user_questions_loaded', 1) print("Processing intents from Redis...") for intent in intents['intents']: for pattern in intent['patterns']: w = nltk.word_tokenize(pattern) words.extend(w) documents.append((w, intent['tag'])) if intent['tag'] not in classes: classes.append(intent['tag']) print(f"Generating synonyms for intent '{intent['tag']}'...") with multiprocessing.Pool() as pool: results = [] for _ in tqdm(range(100000), desc="Generating synonyms", leave=False): if not intent['patterns']: break results.append(pool.apply_async(generate_synonym_pattern, (intent['patterns'],))) for result in results: new_pattern = result.get() if new_pattern: intent['patterns'].append(new_pattern) words = [lemmatizer.lemmatize(w.lower()) for w in words if w not in ignore_words] words = sorted(list(set(words))) classes = sorted(list(set(classes))) print("Creating training data...") training = [] output_empty = [0] * len(classes) for doc in documents: bag = [] pattern_words = doc[0] pattern_words = [lemmatizer.lemmatize(word.lower()) for word in pattern_words] for w in words: bag.append(1) if w in pattern_words else bag.append(0) output_row = list(output_empty) output_row[classes.index(doc[1])] = 1 training.append([bag, output_row]) if not training: print("No training data yet. Waiting...") await asyncio.sleep(60) continue train_x = np.array([row[0] for row in training]) train_y = np.array([row[1] for row in training]) print("Loading or creating model...") if r.exists('chatbot_model'): with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file: temp_file.write(r.get('chatbot_model')) temp_file_name = temp_file.name model = load_model(temp_file_name) os.remove(temp_file_name) else: input_layer = Input(shape=(len(train_x[0]),)) layer1 = Dense(128, activation='relu')(input_layer) layer2 = Dropout(0.5)(layer1) layer3 = Dense(64, activation='relu')(layer2) layer4 = Dropout(0.5)(layer3) output_layer = Dense(len(classes), activation='softmax')(layer4) model = Sequential(layers=[input_layer, layer1, layer2, layer3, layer4, output_layer]) sgd = SGD(learning_rate=0.01, momentum=0.9, nesterov=True) model.compile(loss='categorical_crossentropy', optimizer=sgd, metrics=['accuracy']) print("Training the model...") model.fit(train_x, train_y, epochs=1, batch_size=len(train_x), verbose=0, callbacks=[TqdmCallback(verbose=2)]) print("Saving data to Redis...") r.set('words', pickle.dumps(words)) r.set('classes', pickle.dumps(classes)) with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file: save_model(model, temp_file.name) with open(temp_file.name, 'rb') as f: r.set('chatbot_model', f.read()) os.remove(temp_file.name) print("Data and model saved. Re-training...") def generate_synonym_pattern(patterns): new_pattern = [] for word in random.choice(patterns).split(): synonyms = wordnet.synsets(word) if synonyms: synonym = random.choice(synonyms[0].lemmas()).name() new_pattern.append(synonym) else: new_pattern.append(word) return " ".join(new_pattern) def start_training_loop(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(train_and_save_model()) class ChatMessage(BaseModel): message: str @app.post("/chat") async def chat(message: ChatMessage): words = pickle.loads(r.get('words')) classes = pickle.loads(r.get('classes')) with io.BytesIO(r.get('chatbot_model')) as f: with tempfile.NamedTemporaryFile(delete=False, suffix='.h5') as temp_file: temp_file.write(f.read()) temp_file_name = temp_file.name model = load_model(temp_file_name) os.remove(temp_file_name) sentence_words = nltk.word_tokenize(message.message) sentence_words = [lemmatizer.lemmatize(word.lower()) for word in sentence_words] bag = [0] * len(words) for s in sentence_words: for i, w in enumerate(words): if w == s: bag[i] = 1 p = model.predict(np.array([bag]))[0] ERROR_THRESHOLD = 0.25 results = [[i, p] for i, p in enumerate(p) if p > ERROR_THRESHOLD] results.sort(key=lambda x: x[1], reverse=True) return_list = [] for i, p in results: return_list.append({"intent": classes[i], "probability": str(p)}) r.rpush('user_questions', message.message) asyncio.create_task(train_and_save_model()) return return_list @app.post("/tag") async def tag_question(question: str, tag: str): r.set(f"tag:{question}", tag) return {"message": "Tag saved"} html_code = """ Chatbot

Chatbot

""" @app.get("/", response_class=HTMLResponse) async def root(): return html_code if __name__ == "__main__": initialize_redis() training_process = multiprocessing.Process(target=start_training_loop) training_process.start() uvicorn.run(app, host="0.0.0.0", port=7860)