|
from setup import *
|
|
from typing import List, Optional
|
|
from typing_extensions import TypedDict
|
|
from pydantic import BaseModel, Field
|
|
from langgraph.graph import START, END, StateGraph
|
|
from langgraph.checkpoint.memory import MemorySaver
|
|
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, get_buffer_string
|
|
from langgraph.constants import Send
|
|
from operator import add
|
|
from langgraph.graph import MessagesState
|
|
from typing import Annotated
|
|
from langchain_community.vectorstores import Chroma
|
|
from langchain_community.embeddings.jina import JinaEmbeddings
|
|
|
|
|
|
class Analyst(BaseModel):
|
|
affiliation: str = Field(
|
|
description="Primary affiliation of the analyst.",
|
|
)
|
|
name: str = Field(
|
|
description="Name of the analyst."
|
|
)
|
|
role: str = Field(
|
|
description="Role of the analyst in the context of the topic.",
|
|
)
|
|
description: str = Field(
|
|
description="Description of the analyst focus, concerns, and motives.",
|
|
)
|
|
@property
|
|
def persona(self) -> str:
|
|
return f"Name: {self.name}\nRole: {self.role}\nAffiliation: {self.affiliation}\nDescription: {self.description}\n"
|
|
|
|
|
|
class Perspectives(BaseModel):
|
|
analysts: List[Analyst] = Field(
|
|
description="Comprehensive list of analysts with their roles and affiliations.",
|
|
)
|
|
|
|
|
|
class GenerateAnalystsState(TypedDict):
|
|
topic: str
|
|
max_analysts: int
|
|
analysts: List[Analyst]
|
|
|
|
|
|
class InterviewState(MessagesState):
|
|
max_num_turns: int
|
|
context: Annotated[list, add]
|
|
analyst: Analyst
|
|
interview: str
|
|
sections: list
|
|
|
|
|
|
class SearchQuery(BaseModel):
|
|
search_query: str = Field(None, description="Search query for retrieval.")
|
|
|
|
|
|
|
|
def create_analysts(state: GenerateAnalystsState):
|
|
|
|
""" Create analysts """
|
|
|
|
topic=state['topic']
|
|
max_analysts=state['max_analysts']
|
|
|
|
structured_llm = llm.with_structured_output(Perspectives)
|
|
|
|
analyst_instructions = """You are tasked with creating a set of AI analyst personas. Follow these instructions carefully:
|
|
1. First, review the research topic:{topic}
|
|
2. Create {max_analysts} analysts with following roles:
|
|
- Industry expert
|
|
- GenAI expert
|
|
- Business strategist
|
|
3. Determine the most interesting themes based upon documents and/or feedback above.
|
|
4. Pick the top {max_analysts} themes.
|
|
5. For each theme, create one analyst with ALL of the following required fields: - name: A fitting name for the analyst - role: Their specific role or title - affiliation: Their primary organization or institution - description: A detailed description of their focus areas, concerns, and motives
|
|
6. Ensure every analyst includes all four fields without exception.
|
|
Remember: Every analyst **MUST** have all four fields (name, role, affiliation, and description) properly defined. Incomplete personas are not acceptable."""
|
|
|
|
|
|
system_message = analyst_instructions.format(topic=topic, max_analysts=max_analysts)
|
|
|
|
analysts = structured_llm.invoke([SystemMessage(content=system_message)]+[HumanMessage(content="Generate the set of analysts.")])
|
|
|
|
|
|
return {"analysts": analysts.analysts}
|
|
|
|
|
|
|
|
|
|
def vectorstore_writing(doc_splits):
|
|
global retriever
|
|
vectorstore = Chroma.from_documents(
|
|
documents=doc_splits,
|
|
collection_name="rag-chroma",
|
|
embedding = JinaEmbeddings(model_name='jina-embeddings-v3'),
|
|
persist_directory='./chroma_db'
|
|
)
|
|
retriever = vectorstore.as_retriever()
|
|
|
|
|
|
|
|
|
|
|
|
def generate_question(state:InterviewState):
|
|
""" Generate questions for the interview """
|
|
|
|
|
|
|
|
analyst = state['analyst']
|
|
messages = state['messages']
|
|
context = state["context"]
|
|
|
|
question_instructions = """You are an analyst tasked with interviewing an expert to learn about the use of Generative AI (GenAI) applications in a specific industry or company, if mentioned.
|
|
|
|
Your goal is to uncover interesting and specific insights related to the topic of Generative AI use cases.
|
|
|
|
Interesting: Insights that are surprising, non-obvious, or reveal unique applications of GenAI in the industry or company.
|
|
Specific: Insights that avoid generalities and include specific examples or case studies relevant to the company’s offerings, strategic focus areas, or the industry’s needs.
|
|
Focus Areas:
|
|
Explore the company's key offerings and strategic focus areas (e.g., operations, supply chain, customer experience, etc.), if the company is named.
|
|
Discuss industry-wide trends, innovations, and opportunities enabled by GenAI, such as improved operational efficiency, enhanced customer experiences, or streamlined supply chain processes.
|
|
Gather details on the company or industry's vision and products, focusing on how GenAI can be applied to enhance or transform their workflows.
|
|
Task:
|
|
Begin by introducing yourself with a name that fits your persona, then ask your question.
|
|
|
|
Continue asking follow-up questions to drill down into:
|
|
|
|
Specific GenAI use cases within the company's domain or the industry.
|
|
How these applications align with the company's or industry's strategic goals.
|
|
Real-world examples or future opportunities for integrating GenAI into their processes.
|
|
Complete the interview by saying:
|
|
"Thank you so much for your help!"
|
|
|
|
Remember to stay in character throughout the conversation, reflecting your persona and the provided goals."""
|
|
|
|
|
|
question = llm.invoke([SystemMessage(content=question_instructions)]+[HumanMessage(content="Generate the question.")])
|
|
|
|
return {"messages": [question]}
|
|
|
|
|
|
|
|
def search_vectorstore(state: InterviewState):
|
|
|
|
""" Retrieve docs from Docstore """
|
|
|
|
|
|
search_instructions = SystemMessage(content=f"""You will be given a conversation between an analyst and an expert.
|
|
|
|
Your goal is to generate a well-structured query for use in retrieval and / or web-search related to the conversation.
|
|
|
|
First, analyze the full conversation.
|
|
|
|
Pay particular attention to the final question posed by the analyst.
|
|
|
|
Convert this final question into a well-structured web search query""")
|
|
|
|
|
|
structured_llm = llm.with_structured_output(SearchQuery)
|
|
search_query = structured_llm.invoke([search_instructions]+state['messages'])
|
|
|
|
|
|
search_docs = retriever.invoke(input=search_query.search_query)
|
|
|
|
|
|
formatted_search_docs = "\n\n---\n\n".join(
|
|
[
|
|
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
|
|
for doc in search_docs
|
|
]
|
|
)
|
|
|
|
return {"context": [formatted_search_docs]}
|
|
|
|
|
|
|
|
def generate_answer(state: InterviewState):
|
|
|
|
""" Node to answer a question """
|
|
|
|
|
|
analyst = state["analyst"]
|
|
messages = state["messages"]
|
|
context = state["context"]
|
|
|
|
|
|
answer_instructions = """You are an expert being interviewed by an analyst.
|
|
|
|
Here is analyst area of focus: {goals}.
|
|
|
|
You goal is to answer a question posed by the interviewer.
|
|
|
|
To answer question, use this context:
|
|
|
|
{context}
|
|
|
|
When answering questions, follow these guidelines:
|
|
|
|
1. Use only the information provided in the context.
|
|
|
|
2. Do not introduce external information or make assumptions beyond what is explicitly stated in the context.
|
|
|
|
3. The context contain sources at the topic of each individual document.
|
|
|
|
4. Include these sources your answer next to any relevant statements. For example, for source # 1 use [1].
|
|
|
|
5. List your sources in order at the bottom of your answer. [1] Source 1, [2] Source 2, etc
|
|
|
|
6. If the source is: <Document source="assistant/docs/llama3_1.pdf" page="7"/>' then just list:
|
|
|
|
[1] assistant/docs/llama3_1.pdf, page 7
|
|
|
|
And skip the addition of the brackets as well as the Document source preamble in your citation."""
|
|
|
|
|
|
|
|
|
|
system_message = answer_instructions.format(goals=analyst.persona, context=context)
|
|
answer = llm.invoke([SystemMessage(content=system_message)]+messages)
|
|
|
|
|
|
answer.name = "expert"
|
|
|
|
|
|
return {"messages": [answer]}
|
|
|
|
|
|
def save_interview(state: InterviewState):
|
|
|
|
""" Save interviews """
|
|
|
|
|
|
messages = state["messages"]
|
|
|
|
|
|
interview = get_buffer_string(messages)
|
|
|
|
|
|
return {"interview": interview}
|
|
|
|
|
|
|
|
def route_messages(state: InterviewState,
|
|
name: str = "expert"):
|
|
|
|
""" Route between question and answer """
|
|
|
|
|
|
messages = state["messages"]
|
|
max_num_turns = state.get('max_num_turns',2)
|
|
|
|
|
|
num_responses = len(
|
|
[m for m in messages if isinstance(m, AIMessage) and m.name == name]
|
|
)
|
|
|
|
|
|
if num_responses >= max_num_turns:
|
|
return 'save_interview'
|
|
|
|
|
|
|
|
last_question = messages[-2]
|
|
|
|
if "Thank you so much for your help" in last_question.content:
|
|
return 'save_interview'
|
|
return "ask_question"
|
|
|
|
|
|
|
|
def write_section(state: InterviewState):
|
|
|
|
""" Node to answer a question """
|
|
|
|
|
|
interview = state["interview"]
|
|
context = state["context"]
|
|
analyst = state["analyst"]
|
|
|
|
|
|
section_writer_instructions = """You are an expert technical writer.
|
|
|
|
Your task is to create a short, easily digestible section of a report based on a set of source documents.
|
|
|
|
1. Analyze the content of the source documents:
|
|
- The name of each source document is at the start of the document, with the <Document tag.
|
|
|
|
2. Create a report structure using markdown formatting:
|
|
- Use ## for the section title
|
|
- Use ### for sub-section headers
|
|
|
|
3. Write the report following this structure:
|
|
a. Title (## header)
|
|
b. Summary (### header)
|
|
c. Sources (### header)
|
|
|
|
4. Make your title engaging based upon the focus area of the analyst:
|
|
{focus}
|
|
|
|
5. For the summary section:
|
|
- Set up summary with general background / context related to the focus area of the analyst
|
|
- Emphasize what is novel, interesting, or surprising about insights gathered from the interview
|
|
- Create a numbered list of source documents, as you use them
|
|
- Do not mention the names of interviewers or experts
|
|
- Aim for approximately 400 words maximum
|
|
- Use numbered sources in your report (e.g., [1], [2]) based on information from source documents
|
|
|
|
6. In the Sources section:
|
|
- Include all sources used in your report
|
|
- Provide full links to relevant websites or specific document paths
|
|
- Separate each source by a newline. Use two spaces at the end of each line to create a newline in Markdown.
|
|
- It will look like:
|
|
|
|
### Sources
|
|
[1] Link or Document name
|
|
[2] Link or Document name
|
|
|
|
7. Be sure to combine sources. For example this is not correct:
|
|
|
|
[3] https://ai.meta.com/blog/meta-llama-3-1/
|
|
[4] https://ai.meta.com/blog/meta-llama-3-1/
|
|
|
|
There should be no redundant sources. It should simply be:
|
|
|
|
[3] https://ai.meta.com/blog/meta-llama-3-1/
|
|
|
|
8. Final review:
|
|
- Ensure the report follows the required structure
|
|
- Include no preamble before the title of the report
|
|
- Check that all guidelines have been followed"""
|
|
|
|
|
|
|
|
system_message = section_writer_instructions.format(focus=analyst.description)
|
|
section = llm.invoke([SystemMessage(content=system_message)]+[HumanMessage(content=f"Use this source to write your section: {context}")])
|
|
|
|
|
|
return {"sections": [section.content]}
|
|
|
|
|
|
|
|
|
|
interview_builder = StateGraph(InterviewState)
|
|
interview_builder.add_node("ask_question", generate_question)
|
|
interview_builder.add_node("search_rag", search_vectorstore)
|
|
interview_builder.add_node("answer_question", generate_answer)
|
|
interview_builder.add_node("save_interview", save_interview)
|
|
interview_builder.add_node("write_section", write_section)
|
|
|
|
|
|
interview_builder.add_edge(START, "ask_question")
|
|
interview_builder.add_edge("ask_question", "search_rag")
|
|
interview_builder.add_edge("search_rag", "answer_question")
|
|
interview_builder.add_conditional_edges("answer_question", route_messages,['ask_question','save_interview'])
|
|
interview_builder.add_edge("save_interview", "write_section")
|
|
interview_builder.add_edge("write_section", END)
|
|
|
|
|
|
memory = MemorySaver()
|
|
interview_graph = interview_builder.compile(checkpointer=memory).with_config(run_name="Conduct Interviews")
|
|
|
|
|
|
|
|
|
|
class ResearchGraphState(TypedDict):
|
|
topic: str
|
|
max_analysts: int
|
|
analysts: List[Analyst]
|
|
sections: Annotated[list, add]
|
|
introduction: str
|
|
content: str
|
|
conclusion: str
|
|
final_report: str
|
|
human_analyst_feedback: Optional[str]
|
|
|
|
|
|
|
|
def initiate_all_interviews(state: ResearchGraphState):
|
|
""" This is the "map" step where we run each interview sub-graph using Send API """
|
|
|
|
|
|
human_analyst_feedback=state.get('human_analyst_feedback')
|
|
if human_analyst_feedback:
|
|
|
|
return "create_analysts"
|
|
|
|
|
|
else:
|
|
topic = state["topic"]
|
|
return [Send("conduct_interview", {"analyst": analyst,
|
|
"messages": [HumanMessage(
|
|
content=f"So you said you were writing an article on {topic}?")],
|
|
}) for analyst in state["analysts"]]
|
|
|
|
report_writer_instructions = '''You are a technical writer tasked with creating a report on the overall topic:
|
|
|
|
**{topic}**
|
|
|
|
Your team of analysts has conducted interviews and written memos based on their findings. Your task is to consolidate the insights from these memos into a cohesive and structured report, following this format:
|
|
|
|
Think deeply and Generate atleat 12 use cases based on the memos.
|
|
|
|
### Format for Each Use Case
|
|
1. **Title Header:** Use a descriptive title for each use case, such as "## Use Case 1: AI-Powered Predictive Maintenance."
|
|
2. **Objective/Use Case:** Summarize the primary goal or application of AI for this use case in one or two sentences.
|
|
3. **AI Application:** Describe the specific AI technologies or methods used to achieve the objective.
|
|
4. **Cross-Functional Benefit:** Outline the key benefits across various functions, formatted as bullet points, specifying which department or area benefits from the AI use case.
|
|
|
|
### Example Format:
|
|
|
|
## Use Case 1: AI-Powered Predictive Maintenance
|
|
**Objective/Use Case:** Reduce equipment downtime and maintenance costs by predicting equipment failures before they occur.
|
|
**AI Application:** Implement machine learning algorithms that analyze real-time sensor data from machinery to predict potential failures and schedule maintenance proactively.
|
|
**Cross-Functional Benefit:**
|
|
- **Operations & Maintenance:** Minimizes unplanned downtime and extends equipment lifespan.
|
|
- **Finance:** Reduces maintenance costs and improves budgeting accuracy.
|
|
- **Supply Chain:** Optimizes spare parts inventory based on predictive insights.
|
|
|
|
## Use Case 2: Real-Time Quality Control with Computer Vision
|
|
**Objective/Use Case:** Enhance product quality by detecting defects in products during manufacturing.
|
|
**AI Application:** Deploy AI-powered computer vision systems on production lines to identify surface defects and inconsistencies in real time.
|
|
**Cross-Functional Benefit:**
|
|
- **Quality Assurance:** Improves defect detection accuracy and reduces scrap rates.
|
|
- **Production:** Enables immediate corrective actions, enhancing overall efficiency.
|
|
- **Customer Satisfaction:** Delivers higher-quality products, strengthening client relationships.
|
|
|
|
### Report Guidelines
|
|
1. Begin with the first use case title in the specified format.
|
|
2. Do not include any preamble or introductory text for the report.
|
|
3. Consolidate insights into distinct use cases, with a focus on clarity and relevance.
|
|
4. Preserve any citations included in the memos, formatted in brackets, e.g., [1], [2].
|
|
5. After detailing all use cases, include a **Sources** section with the title: `## Sources`.
|
|
6. Be sure to combine sources. For example this is not correct:
|
|
|
|
[3] https://ai.meta.com/blog/meta-llama-3-1/
|
|
[4] https://ai.meta.com/blog/meta-llama-3-1/
|
|
|
|
There should be no redundant sources. It should simply be:
|
|
[3] https://ai.meta.com/blog/meta-llama-3-1/
|
|
|
|
### Your Inputs
|
|
You will be given a collection of memos from your analysts under `{context}`. Extract and distill insights into specific use cases, ensuring each use case adheres to the prescribed format.'''
|
|
|
|
def write_report(state: ResearchGraphState):
|
|
|
|
sections = state["sections"]
|
|
topic = state["topic"]
|
|
|
|
|
|
formatted_str_sections = "\n\n".join([f"{section}" for section in sections])
|
|
|
|
|
|
system_message = report_writer_instructions.format(topic=topic, context=formatted_str_sections)
|
|
report = llm.invoke([SystemMessage(content=system_message)]+[HumanMessage(content=f"Write a report based upon these memos.")])
|
|
return {"content": report.content}
|
|
|
|
|
|
def human_feedback(state: ResearchGraphState):
|
|
""" No-op node that should be interrupted on """
|
|
pass
|
|
|
|
|
|
|
|
def write_introduction(state: ResearchGraphState):
|
|
|
|
sections = state["sections"]
|
|
topic = state["topic"]
|
|
|
|
|
|
formatted_str_sections = "\n\n".join([f"{section}" for section in sections])
|
|
|
|
intro_conclusion_instructions = """You are a technical writer finishing a report on {topic}
|
|
|
|
You will be given all of the sections of the report.
|
|
|
|
You job is to write a crisp and compelling introduction or conclusion section.
|
|
|
|
The user will instruct you whether to write the introduction or conclusion.
|
|
|
|
Include no pre-amble for either section.
|
|
|
|
Target around 100 words, crisply previewing (for introduction) or recapping (for conclusion) all of the sections of the report.
|
|
|
|
Use markdown formatting.
|
|
|
|
For your introduction, create a compelling title and use the # header for the title.
|
|
|
|
For your introduction, use ## Introduction as the section header.
|
|
|
|
For your conclusion, use ## Conclusion as the section header.
|
|
|
|
Here are the sections to reflect on for writing: {formatted_str_sections}"""
|
|
|
|
|
|
|
|
instructions = intro_conclusion_instructions.format(topic=topic, formatted_str_sections=formatted_str_sections)
|
|
intro = llm.invoke([instructions]+[HumanMessage(content=f"Write the report introduction")])
|
|
return {"introduction": intro.content}
|
|
|
|
|
|
def write_conclusion(state: ResearchGraphState):
|
|
|
|
sections = state["sections"]
|
|
topic = state["topic"]
|
|
|
|
|
|
formatted_str_sections = "\n\n".join([f"{section}" for section in sections])
|
|
|
|
intro_conclusion_instructions = """You are a technical writer finishing a report on {topic}
|
|
|
|
You will be given all of the sections of the report.
|
|
|
|
You job is to write a crisp and compelling introduction or conclusion section.
|
|
|
|
The user will instruct you whether to write the introduction or conclusion.
|
|
|
|
Include no pre-amble for either section.
|
|
|
|
Target around 100 words, crisply previewing (for introduction) or recapping (for conclusion) all of the sections of the report.
|
|
|
|
Use markdown formatting.
|
|
|
|
For your introduction, create a compelling title and use the # header for the title.
|
|
|
|
For your introduction, use ## Introduction as the section header.
|
|
|
|
For your conclusion, use ## Conclusion as the section header.
|
|
|
|
Here are the sections to reflect on for writing: {formatted_str_sections}"""
|
|
|
|
|
|
|
|
instructions = intro_conclusion_instructions.format(topic=topic, formatted_str_sections=formatted_str_sections)
|
|
conclusion = llm.invoke([instructions]+[HumanMessage(content=f"Write the report conclusion")])
|
|
return {"conclusion": conclusion.content}
|
|
|
|
|
|
def finalize_report(state: ResearchGraphState):
|
|
""" The is the "reduce" step where we gather all the sections, combine them, and reflect on them to write the intro/conclusion """
|
|
|
|
content = state["content"]
|
|
if content.startswith("## Insights"):
|
|
content = content.strip("## Insights")
|
|
if "## Sources" in content:
|
|
try:
|
|
content, sources = content.split("\n## Sources\n")
|
|
except:
|
|
sources = None
|
|
else:
|
|
sources = None
|
|
|
|
final_report = state["introduction"] + "\n\n---\n\n" + content + "\n\n---\n\n" + state["conclusion"]
|
|
if sources is not None:
|
|
final_report += "\n\n## Sources\n" + sources
|
|
return {"final_report": final_report}
|
|
|
|
|
|
|
|
def usecase_agent_func(topic,max_analysts):
|
|
|
|
builder = StateGraph(ResearchGraphState)
|
|
builder.add_node("create_analysts", create_analysts)
|
|
builder.add_node("human_feedback", human_feedback)
|
|
builder.add_node("conduct_interview", interview_builder.compile())
|
|
builder.add_node("write_report",write_report)
|
|
builder.add_node("write_introduction",write_introduction)
|
|
builder.add_node("write_conclusion",write_conclusion)
|
|
builder.add_node("finalize_report",finalize_report)
|
|
|
|
|
|
builder.add_edge(START, "create_analysts")
|
|
builder.add_edge("create_analysts", "human_feedback")
|
|
builder.add_conditional_edges("human_feedback", initiate_all_interviews, ["create_analysts", "conduct_interview"])
|
|
builder.add_edge("conduct_interview", "write_report")
|
|
builder.add_edge("conduct_interview", "write_introduction")
|
|
builder.add_edge("conduct_interview", "write_conclusion")
|
|
builder.add_edge(["write_conclusion", "write_report", "write_introduction"], "finalize_report")
|
|
builder.add_edge("finalize_report", END)
|
|
|
|
|
|
memory = MemorySaver()
|
|
graph = builder.compile(checkpointer=memory)
|
|
config = {"configurable": {"thread_id": "1"}}
|
|
graph.invoke({"topic":topic,
|
|
"max_analysts":max_analysts,
|
|
'human_analyst_feedback': None},
|
|
config)
|
|
final_state = graph.get_state(config)
|
|
report = final_state.values.get('final_report')
|
|
|
|
print('-----REPORT-----', report)
|
|
|
|
return report
|
|
|
|
|
|
|
|
|