import uvicorn import nltk nltk.download('punkt') nltk.download('wordnet') nltk.download('omw-1.4') nltk.download('punkt_tab') from nltk.stem import WordNetLemmatizer from nltk.corpus import wordnet import json import pickle import random import numpy as np from tensorflow.keras import Sequential, Input from tensorflow.keras.layers import Dense, Dropout from tensorflow.keras.optimizers import SGD from tensorflow.keras.models import load_model import redis import os from dotenv import load_dotenv from fastapi import FastAPI from fastapi.responses import HTMLResponse load_dotenv() app = FastAPI() lemmatizer = WordNetLemmatizer() redis_password = os.getenv("REDIS_PASSWORD") r = redis.Redis(host=os.getenv("REDIS_HOST"), port=int(os.getenv("REDIS_PORT")), password=redis_password) def initialize_redis(): global r try: r.ping() print("Redis connection successful.") except redis.exceptions.ConnectionError: print("Error connecting to Redis. Exiting.") exit(1) async def train_and_save_model(): global lemmatizer, r while True: words = [] classes = [] documents = [] ignore_words = ['?', '!'] try: with open('intents.json') as file: intents = json.load(file) except FileNotFoundError: intents = {"intents": []} with open('intents.json', 'w') as file: json.dump(intents, file, indent=4) print("intents.json created. Please populate it with training data.") await asyncio.sleep(60) continue if not r.exists('user_questions_loaded'): user_questions = r.lrange('user_questions', 0, -1) for question in user_questions: question = question.decode('utf-8') try: existing_tag = r.get(f"tag:{question}").decode('utf-8') documents.append((nltk.word_tokenize(question), existing_tag)) if existing_tag not in classes: classes.append(existing_tag) except AttributeError: documents.append((nltk.word_tokenize(question), "unknown")) if "unknown" not in classes: classes.append("unknown") r.set('user_questions_loaded', 1) for intent in intents['intents']: for pattern in intent['patterns']: w = nltk.word_tokenize(pattern) words.extend(w) documents.append((w, intent['tag'])) if intent['tag'] not in classes: classes.append(intent['tag']) for _ in range(100000): if not intent['patterns']: break new_pattern = [] for word in random.choice(intent['patterns']).split(): synonyms = wordnet.synsets(word) if synonyms: synonym = random.choice(synonyms[0].lemmas()).name() new_pattern.append(synonym) else: new_pattern.append(word) intent['patterns'].append(" ".join(new_pattern)) words = [lemmatizer.lemmatize(w.lower()) for w in words if w not in ignore_words] words = sorted(list(set(words))) training = [] output_empty = [0] * len(classes) for doc in documents: bag = [] pattern_words = doc[0] pattern_words = [lemmatizer.lemmatize(word.lower()) for word in pattern_words] for w in words: bag.append(1) if w in pattern_words else bag.append(0) output_row = list(output_empty) output_row[classes.index(doc[1])] = 1 training.append([bag, output_row]) training = np.array(training, dtype=object) if not training.size: print("No training data yet. Waiting...") await asyncio.sleep(60) continue train_x = list(training[:, 0]) train_y = list(training[:, 1]) if r.exists('model'): model = load_model('chatbot_model') else: model = Sequential() model.add(Input(shape=(len(train_x[0]),))) model.add(Dense(128, activation='relu')) model.add(Dropout(0.5)) model.add(Dense(64, activation='relu')) model.add(Dropout(0.5)) model.add(Dense(len(train_y[0]), activation='softmax')) sgd = SGD(learning_rate=0.01, momentum=0.9, nesterov=True) model.compile(loss='categorical_crossentropy', optimizer=sgd, metrics=['accuracy']) model.fit(np.array(train_x), np.array(train_y), epochs=1, batch_size=len(train_x)) if not r.exists('words'): r.set('words', pickle.dumps(words)) else: existing_words = pickle.loads(r.get('words')) words = sorted(list(set(existing_words + words))) r.set('words', pickle.dumps(words)) if not r.exists('classes'): r.set('classes', pickle.dumps(classes)) else: existing_classes = pickle.loads(r.get('classes')) classes = sorted(list(set(existing_classes + classes))) r.set('classes', pickle.dumps(classes)) model.save('chatbot_model') print("Data and model saved. Re-training...") @app.post("/chat") async def chat(message: str): words = pickle.loads(r.get('words')) classes = pickle.loads(r.get('classes')) model = load_model('chatbot_model') sentence_words = nltk.word_tokenize(message) sentence_words = [lemmatizer.lemmatize(word.lower()) for word in sentence_words] bag = [0] * len(words) for s in sentence_words: for i, w in enumerate(words): if w == s: bag[i] = 1 p = model.predict(np.array([bag]))[0] ERROR_THRESHOLD = 0.25 results = [[i,p] for i,p in enumerate(p) if p>ERROR_THRESHOLD] results.sort(key=lambda x: x[1], reverse=True) return_list = [] for i,p in results: return_list.append({"intent": classes[i], "probability": str(p)}) r.rpush('user_questions', message) return return_list @app.post("/tag") async def tag_question(question: str, tag: str): r.set(f"tag:{question}", tag) return {"message": "Tag saved"} html_code = """ Chatbot

Chatbot

""" @app.get("/", response_class=HTMLResponse) async def root(): return html_code if __name__ == "__main__": initialize_redis() import asyncio uvicorn.run(app, host="0.0.0.0", port=7860) asyncio.run(train_and_save_model())