import subprocess import os import torch from dotenv import load_dotenv from langchain_community.vectorstores import Qdrant from langchain_huggingface import HuggingFaceEmbeddings from langchain.prompts import ChatPromptTemplate from langchain.schema.runnable import RunnablePassthrough from langchain.schema.output_parser import StrOutputParser from qdrant_client import QdrantClient, models from langchain_openai import ChatOpenAI import gradio as gr import logging from typing import List, Tuple, Generator from dataclasses import dataclass from datetime import datetime from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline from langchain_huggingface.llms import HuggingFacePipeline from langchain_cerebras import ChatCerebras from queue import Queue from threading import Thread from langchain.chains import LLMChain from langchain_core.prompts import PromptTemplate from langchain_huggingface import HuggingFaceEndpoint from langchain_google_genai import ChatGoogleGenerativeAI # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class Message: role: str content: str timestamp: str class ChatHistory: def __init__(self): self.messages: List[Message] = [] def add_message(self, role: str, content: str): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.messages.append(Message(role=role, content=content, timestamp=timestamp)) def get_formatted_history(self, max_messages: int = 10) -> str: recent_messages = self.messages[-max_messages:] if len(self.messages) > max_messages else self.messages formatted_history = "\n".join([ f"{msg.role}: {msg.content}" for msg in recent_messages ]) return formatted_history def clear(self): self.messages = [] # Load environment variables and setup load_dotenv() HF_TOKEN = os.getenv("HF_TOKEN") C_apikey = os.getenv("C_apikey") OPENAPI_KEY = os.getenv("OPENAPI_KEY") GEMINI = os.getenv("GEMINI") if not HF_TOKEN: logger.error("HF_TOKEN is not set in the environment variables.") exit(1) embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") try: client = QdrantClient( url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY"), prefer_grpc=True ) except Exception as e: logger.error("Failed to connect to Qdrant.") exit(1) collection_name = "mawared" try: client.create_collection( collection_name=collection_name, vectors_config=models.VectorParams( size=384, distance=models.Distance.COSINE ) ) except Exception as e: if "already exists" not in str(e): logger.error(f"Error creating collection: {e}") exit(1) db = Qdrant( client=client, collection_name=collection_name, embeddings=embeddings, ) retriever = db.as_retriever( search_type="similarity", search_kwargs={"k": 5} ) #llm = ChatCerebras( # model="llama-3.3-70b", # api_key=C_apikey, # streaming=True #) # llm = ChatOpenAI( # model="meta-llama/Llama-3.3-70B-Instruct", # temperature=0, # max_tokens=None, # timeout=None, # max_retries=2, # api_key=HF_TOKEN, # if you prefer to pass api key in directly instaed of using env vars # base_url="https://api-inference.huggingface.co/v1/", # stream=True, # ) llm = ChatGoogleGenerativeAI( model="gemini-2.0-flash-thinking-exp-01-21", temperature=0, max_tokens=None, timeout=None, max_retries=2, api_key=GEMINI, stream=True, ) template = """ You are a specialized friendly AI assistant for the Mawared HR System, designed to provide accurate and contextually relevant support based solely on the provided context and chat history. Core Principles Source of Truth: Use only the information available in the retrieved context and chat history. Do not fabricate details or access external knowledge. Clarity and Precision: Communicate clearly, concisely, and professionally, using straightforward language for easy comprehension. Actionable Guidance: Deliver practical solutions, step-by-step workflows, and troubleshooting advice directly related to Mawared HR queries. Structured Instructions: Provide numbered, easy-to-follow instructions when explaining complex processes. Targeted Clarification: If a query lacks detail, ask specific questions to obtain the necessary information, explicitly stating what is missing. Exclusive Focus: Address only Mawared HR-related topics and avoid unrelated discussions. Professional Tone: Maintain a friendly, approachable, and professional demeanor. Response Guidelines Analyze the Query Thoughtfully: Start by thoroughly examining the user's question and reviewing the chat history. Consider what the user explicitly asked and infer their intent from the context provided. Mentally identify potential gaps in information before proceeding. Break Down Context Relevance: Isolate and interpret relevant pieces of context that apply directly to the query. Match the user's needs with the most relevant data from the context or chat history. Develop the Response in a Stepwise Manner: Construct a logical chain of thoughts: What does the user want to achieve? Which parts of the context can address this? What steps or details are needed for clarity? Provide responses in a structured, easy-to-follow format (e.g., numbered lists, bullet points). Ask for Clarifications Strategically: If the query lacks sufficient detail, identify the precise information missing. Frame your clarification politely and explicitly (e.g., “Could you confirm [specific detail] to proceed with [action/task]?”). Ensure Directness and Professionalism: Avoid unnecessary elaborations or irrelevant information. Maintain a friendly, professional tone throughout the response. Double-Check for Exclusivity: Ensure all guidance is strictly based on the retrieved context and chat history. Avoid speculating or introducing external knowledge about Mawared HR. Handling Information Gaps If the provided context is insufficient to answer the query: State explicitly that additional information is required to proceed. Clearly outline what details are missing. Avoid fabricating details or making assumptions. Critical Constraint STRICTLY rely on the provided context and chat history for all responses. Do not generate information about Mawared HR beyond these sources. Note: Do not mention a human support contact unless explicitly asked. Refuse to answer any questions thats not related to mawared Hr. You should think step by step to figure out the answer. Previous Conversation: {chat_history} Retrieved Context: {context} Current Question: {question} Answer:{{answer}} """ prompt = ChatPromptTemplate.from_template(template) def create_rag_chain(chat_history: str): chain = ( { "context": retriever, "question": RunnablePassthrough(), "chat_history": lambda x: chat_history } | prompt | llm | StrOutputParser() ) return chain chat_history = ChatHistory() def process_stream(stream_queue: Queue, history: List[List[str]]) -> Generator[List[List[str]], None, None]: """Process the streaming response and update the chat interface""" current_response = "" while True: chunk = stream_queue.get() if chunk is None: # Signal that streaming is complete break current_response += chunk new_history = history.copy() new_history[-1][1] = current_response # Update the assistant's message yield new_history def ask_question_gradio(question: str, history: List[List[str]]) -> Generator[tuple, None, None]: try: if history is None: history = [] chat_history.add_message("user", question) formatted_history = chat_history.get_formatted_history() rag_chain = create_rag_chain(formatted_history) # Update history with user message and empty assistant message history.append([question, ""]) # User message # Create a queue for streaming responses stream_queue = Queue() # Function to process the stream in a separate thread def stream_processor(): try: for chunk in rag_chain.stream(question): stream_queue.put(chunk) stream_queue.put(None) # Signal completion except Exception as e: logger.error(f"Streaming error: {e}") stream_queue.put(None) # Start streaming in a separate thread Thread(target=stream_processor).start() # Yield updates to the chat interface response = "" for updated_history in process_stream(stream_queue, history): response = updated_history[-1][1] yield "", updated_history # Add final response to chat history chat_history.add_message("assistant", response) except Exception as e: logger.error(f"Error during question processing: {e}") if not history: history = [] history.append([question, "An error occurred. Please try again later."]) yield "", history def clear_chat(): chat_history.clear() return [], "" # Gradio Interface with gr.Blocks() as iface: gr.Image("Image.jpg", width=750, height=300, show_label=False, show_download_button=False) gr.Markdown("# Mawared HR Assistant 3.0.0") gr.Markdown('### Instructions') gr.Markdown("Ask a question about MawaredHR and get a detailed answer") chatbot = gr.Chatbot( height=750, show_label=False, bubble_full_width=False, ) with gr.Row(): with gr.Column(scale=20): question_input = gr.Textbox( label="Ask a question:", placeholder="Type your question here...", show_label=False ) with gr.Column(scale=4): with gr.Row(): with gr.Column(): send_button = gr.Button("Send", variant="primary", size="sm") clear_button = gr.Button("Clear Chat", size="sm") # Handle both submit events (Enter key and Send button) submit_events = [question_input.submit, send_button.click] for submit_event in submit_events: submit_event( ask_question_gradio, inputs=[question_input, chatbot], outputs=[question_input, chatbot] ) clear_button.click( clear_chat, outputs=[chatbot, question_input] ) if __name__ == "__main__": iface.launch()