__import__('pysqlite3') # Workaround for sqlite3 error on live Streamlit. import sys sys.modules['sqlite3'] = sys.modules.pop('pysqlite3') # Workaround for sqlite3 error on live Streamlit. import graphviz import traceback from langgraph.graph import StateGraph, END from langchain_openai import ChatOpenAI from pydantic import BaseModel, Field from typing import TypedDict, List, Literal, Dict, Any from langchain_core.output_parsers import StrOutputParser, JsonOutputParser from langchain.prompts import PromptTemplate from langchain.memory import ConversationBufferMemory from pdf_writer import generate_pdf from crew import CrewClass, Essay class GraphState(TypedDict): topic: str response: str documents: List[str] essay: Dict[str, Any] pdf_name: str class RouteQuery(BaseModel): """Route a user query to direct answer or research.""" way: Literal["edit_essay", "write_essay", "answer"] = Field( ..., description="Given a user question, choose to route it to write_essay, edit_essay, or answer", ) class EssayWriter: def __init__(self): self.model = ChatOpenAI(model="gpt-4o-mini-2024-07-18", temperature=0) self.crew = CrewClass(llm=ChatOpenAI(model="gpt-4o-mini-2024-07-18", temperature=0.5)) self.memory = ConversationBufferMemory() self.essay = {} self.router_prompt = """ You are a router, and your duty is to route the user to the correct expert. Always check conversation history and consider your move based on it. If the topic is something about memory or daily talk, route the user to the answer expert. If the topic starts with something like "Can you write" or the user requests an article or essay, route the user to the write_essay expert. If the topic is about editing an essay, route the user to the edit_essay expert. \nConversation History: {memory} \nTopic: {topic} """ self.simple_answer_prompt = """ You are an expert, and you are providing a simple answer to the user's question. \nConversation History: {memory} \nTopic: {topic} """ builder = StateGraph(GraphState) builder.add_node("answer", self.answer) builder.add_node("write_essay", self.write_essay) builder.add_node("edit_essay", self.edit_essay) builder.set_conditional_entry_point(self.router_query, { "write_essay": "write_essay", "answer": "answer", "edit_essay": "edit_essay", }) builder.add_edge("write_essay", END) builder.add_edge("edit_essay", END) builder.add_edge("answer", END) self.graph = builder.compile() self.save_workflow_graph() def router_query(self, state: GraphState): print("**ROUTER**") prompt = PromptTemplate.from_template(self.router_prompt) memory = self.memory.load_memory_variables({}) router_query = self.model.with_structured_output(RouteQuery) chain = prompt | router_query result: RouteQuery = chain.invoke({"topic": state["topic"], "memory": memory}) print("Router Result: ", result.way) return result.way def answer(self, state: GraphState): print("**ANSWER**") prompt = PromptTemplate.from_template(self.simple_answer_prompt) memory = self.memory.load_memory_variables({}) chain = prompt | self.model | StrOutputParser() result = chain.invoke({"topic": state["topic"], "memory": memory}) self.memory.save_context(inputs={"input": state["topic"]}, outputs={"output": result}) return {"response": result} def write_essay(self, state: GraphState): print("**ESSAY COMPLETION**") # Generate the essay using the crew self.essay = self.crew.kickoff({"topic": state["topic"]}) # Save the conversation context self.memory.save_context(inputs={"input": state["topic"]}, outputs={"output": str(self.essay)}) # Generate the PDF and return essay content for preview pdf_name = generate_pdf(self.essay) return { "response": "Here is your essay! You can review it below before downloading.", "essay": self.essay, "pdf_name": pdf_name, } def edit_essay(self, state: GraphState): print("**ESSAY EDIT**") memory = self.memory.load_memory_variables({}) user_request = state["topic"] parser = JsonOutputParser(pydantic_object=Essay) prompt = PromptTemplate( template=( "Edit the JSON file as the user requested, and return the new JSON file." "\n Request: {user_request} " "\n Conversation History: {memory}" "\n JSON File: {essay}" " \n{format_instructions}" ), input_variables=["memory", "user_request", "essay"], partial_variables={"format_instructions": parser.get_format_instructions()}, ) chain = prompt | self.model | parser # Update the essay with the edits self.essay = chain.invoke({"user_request": user_request, "memory": memory, "essay": self.essay}) # Save the conversation context self.memory.save_context(inputs={"input": state["topic"]}, outputs={"output": str(self.essay)}) # Generate the PDF and return essay content for preview pdf_name = generate_pdf(self.essay) return { "response": "Here is your edited essay! You can review it below before downloading.", "essay": self.essay, "pdf_name": pdf_name, } import os import graphviz def save_workflow_graph(self): """Generate and save a Graphviz workflow visualization with logging.""" log_file = "/tmp/graph_debug.log" try: output_path = "/tmp/graph" dot = graphviz.Digraph(format="png") dot.attr(dpi='300') # Define Nodes dot.node("Router", "🔀 Router") dot.node("Write Essay", "📝 Write Essay") dot.node("Edit Essay", "✏️ Edit Essay") dot.node("Answer", "💬 Answer") # Define Edges dot.edge("Router", "Write Essay") dot.edge("Router", "Edit Essay") dot.edge("Router", "Answer") dot.edge("Write Essay", "✅ Done") dot.edge("Edit Essay", "✅ Done") dot.edge("Answer", "✅ Done") # Generate and save the graph in /tmp/ dot.render(output_path, format="png", cleanup=False) # Check if file exists graph_path = "/tmp/graph.png" if os.path.exists(graph_path): with open(log_file, "w") as f: f.write("✅ Graphviz successfully generated /tmp/graph.png\n") print("✅ Graphviz successfully generated /tmp/graph.png") else: raise FileNotFoundError("❌ Graphviz failed to generate /tmp/graph.png") except Exception as e: # Capture full traceback error_message = f"❌ Error generating workflow visualization:\n{traceback.format_exc()}\n" with open(log_file, "w") as f: f.write(error_message) print(error_message) return error_message