from setup import *
from typing import List, Optional
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from langgraph.graph import START, END, StateGraph
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, get_buffer_string
from langgraph.constants import Send
from operator import add
from langgraph.graph import MessagesState
from typing import Annotated
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings.jina import JinaEmbeddings
# from langchain_huggingface import HuggingFaceEmbeddings
class Analyst(BaseModel):
affiliation: str = Field(
description="Primary affiliation of the analyst.",
)
name: str = Field(
description="Name of the analyst."
)
role: str = Field(
description="Role of the analyst in the context of the topic.",
)
description: str = Field(
description="Description of the analyst focus, concerns, and motives.",
)
@property
def persona(self) -> str:
return f"Name: {self.name}\nRole: {self.role}\nAffiliation: {self.affiliation}\nDescription: {self.description}\n"
class Perspectives(BaseModel):
analysts: List[Analyst] = Field(
description="Comprehensive list of analysts with their roles and affiliations.",
)
class GenerateAnalystsState(TypedDict):
topic: str # Research topic
max_analysts: int # Number of analysts to generate
analysts: List[Analyst] # Analyst asking questions
class InterviewState(MessagesState):
max_num_turns: int # Number turns of conversation
context: Annotated[list, add] # Source docs
analyst: Analyst # Analyst asking questions
interview: str # Interview transcript
sections: list # Final key we duplicate in outer state for Send() API
class SearchQuery(BaseModel):
search_query: str = Field(None, description="Search query for retrieval.")
def create_analysts(state: GenerateAnalystsState):
""" Create analysts """
topic=state['topic']
max_analysts=state['max_analysts']
structured_llm = llm.with_structured_output(Perspectives)
analyst_instructions = """You are tasked with creating a set of AI analyst personas. Follow these instructions carefully:
1. First, review the research topic:{topic}
2. Create {max_analysts} analysts with following roles:
- Industry expert
- GenAI expert
- Business strategist
3. Determine the most interesting themes based upon documents and/or feedback above.
4. Pick the top {max_analysts} themes.
5. For each theme, create one analyst with ALL of the following required fields: - name: A fitting name for the analyst - role: Their specific role or title - affiliation: Their primary organization or institution - description: A detailed description of their focus areas, concerns, and motives
6. Ensure every analyst includes all four fields without exception.
Remember: Every analyst **MUST** have all four fields (name, role, affiliation, and description) properly defined. Incomplete personas are not acceptable."""
# System message
system_message = analyst_instructions.format(topic=topic, max_analysts=max_analysts)
analysts = structured_llm.invoke([SystemMessage(content=system_message)]+[HumanMessage(content="Generate the set of analysts.")])
# Write the list of analysis to state
return {"analysts": analysts.analysts}
def vectorstore_writing(doc_splits):
global retriever
vectorstore = Chroma.from_documents(
documents=doc_splits,
collection_name="rag-chroma",
embedding = JinaEmbeddings(model_name='jina-embeddings-v3'),
persist_directory='./chroma_db'
)
retriever = vectorstore.as_retriever()
def generate_question(state:InterviewState):
""" Generate questions for the interview """
# print('----STATE----', state)
# Get the analyst persona
analyst = state['analyst']
messages = state['messages']
context = state["context"]
question_instructions = """You are an analyst tasked with interviewing an expert to learn about the use of Generative AI (GenAI) applications in a specific industry or company, if mentioned.
Your goal is to uncover interesting and specific insights related to the topic of Generative AI use cases.
Interesting: Insights that are surprising, non-obvious, or reveal unique applications of GenAI in the industry or company.
Specific: Insights that avoid generalities and include specific examples or case studies relevant to the company’s offerings, strategic focus areas, or the industry’s needs.
Focus Areas:
Explore the company's key offerings and strategic focus areas (e.g., operations, supply chain, customer experience, etc.), if the company is named.
Discuss industry-wide trends, innovations, and opportunities enabled by GenAI, such as improved operational efficiency, enhanced customer experiences, or streamlined supply chain processes.
Gather details on the company or industry's vision and products, focusing on how GenAI can be applied to enhance or transform their workflows.
Task:
Begin by introducing yourself with a name that fits your persona, then ask your question.
Continue asking follow-up questions to drill down into:
Specific GenAI use cases within the company's domain or the industry.
How these applications align with the company's or industry's strategic goals.
Real-world examples or future opportunities for integrating GenAI into their processes.
Complete the interview by saying:
"Thank you so much for your help!"
Remember to stay in character throughout the conversation, reflecting your persona and the provided goals."""
# Generate the question
question = llm.invoke([SystemMessage(content=question_instructions)]+[HumanMessage(content="Generate the question.")])
return {"messages": [question]}
def search_vectorstore(state: InterviewState):
""" Retrieve docs from Docstore """
# Search query writing
search_instructions = SystemMessage(content=f"""You will be given a conversation between an analyst and an expert.
Your goal is to generate a well-structured query for use in retrieval and / or web-search related to the conversation.
First, analyze the full conversation.
Pay particular attention to the final question posed by the analyst.
Convert this final question into a well-structured web search query""")
# Search query
structured_llm = llm.with_structured_output(SearchQuery)
search_query = structured_llm.invoke([search_instructions]+state['messages'])
# Search
search_docs = retriever.invoke(input=search_query.search_query)
# Format
formatted_search_docs = "\n\n---\n\n".join(
[
f'\n{doc.page_content}\n'
for doc in search_docs
]
)
return {"context": [formatted_search_docs]}
def generate_answer(state: InterviewState):
""" Node to answer a question """
# Get state
analyst = state["analyst"]
messages = state["messages"]
context = state["context"]
answer_instructions = """You are an expert being interviewed by an analyst.
Here is analyst area of focus: {goals}.
You goal is to answer a question posed by the interviewer.
To answer question, use this context:
{context}
When answering questions, follow these guidelines:
1. Use only the information provided in the context.
2. Do not introduce external information or make assumptions beyond what is explicitly stated in the context.
3. The context contain sources at the topic of each individual document.
4. Include these sources your answer next to any relevant statements. For example, for source # 1 use [1].
5. List your sources in order at the bottom of your answer. [1] Source 1, [2] Source 2, etc
6. If the source is: ' then just list:
[1] assistant/docs/llama3_1.pdf, page 7
And skip the addition of the brackets as well as the Document source preamble in your citation."""
# Answer question
system_message = answer_instructions.format(goals=analyst.persona, context=context)
answer = llm.invoke([SystemMessage(content=system_message)]+messages)
# Name the message as coming from the expert
answer.name = "expert"
# Append it to state
return {"messages": [answer]}
def save_interview(state: InterviewState):
""" Save interviews """
# Get messages
messages = state["messages"]
# Convert interview to a string
interview = get_buffer_string(messages)
# Save to interviews key
return {"interview": interview}
def route_messages(state: InterviewState,
name: str = "expert"):
""" Route between question and answer """
# Get messages
messages = state["messages"]
max_num_turns = state.get('max_num_turns',2)
# Check the number of expert answers
num_responses = len(
[m for m in messages if isinstance(m, AIMessage) and m.name == name]
)
# End if expert has answered more than the max turns
if num_responses >= max_num_turns:
return 'save_interview'
# This router is run after each question - answer pair
# Get the last question asked to check if it signals the end of discussion
last_question = messages[-2]
if "Thank you so much for your help" in last_question.content:
return 'save_interview'
return "ask_question"
def write_section(state: InterviewState):
""" Node to answer a question """
# Get state
interview = state["interview"]
context = state["context"]
analyst = state["analyst"]
section_writer_instructions = """You are an expert technical writer.
Your task is to create a short, easily digestible section of a report based on a set of source documents.
1. Analyze the content of the source documents:
- The name of each source document is at the start of the document, with the