casestudyqa / app.py
DylanASHillier's picture
updated with app
570a493
raw
history blame
14.4 kB
# -*- coding: utf-8 -*-
"""CaseStudyQA
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1j93Wywxt8UHwUpQwutRRnW1qKRUKj853
## Setup
"""
import os
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
# Commented out IPython magic to ensure Python compatibility.
# %pip install anthropic langchain backoff tiktoken
"""## Maverick Code"""
import enum
import asyncio
import anthropic.api as anthropic_api
import math
import langchain.schema as llm_schema
class Roles(enum.Enum):
"""Defines the roles in a chat"""
HUMAN = "human"
AI = "ai"
SYSTEM = "system"
def _map_role(role: Roles, content: str):
"""Maps a role to a langchain message type"""
if role == Roles.HUMAN:
return llm_schema.HumanMessage(content=content)
elif role == Roles.AI:
return llm_schema.AIMessage(content=content)
elif role == Roles.SYSTEM:
return llm_schema.SystemMessage(content=content)
else:
return llm_schema.ChatMessage(content=content, role=role.value)
ANTHROPIC_ERRORS_FOR_BACKOFF = (
asyncio.TimeoutError,
anthropic_api.ApiException,
)
ANTHROPIC_BACKOFF_BASE = math.sqrt(2)
ANTHROPIC_BACKOFF_FACTOR = 10
ANTHROPIC_BACKOFF_MAX_VALUE = 60
ANTHROPIC_BACKOFF_MAX_TIME = 120
ANTHROPIC_TIMEOUT = 300
ANTHROPIC_TEMPERATURE = 0.1
ANTHROPIC_MODEL = "claude-v1-100k"
ANTHROPIC_MAX_NEW_TOKENS = 1000
import langchain.chat_models as langchain_chat_models
import backoff
class ChatModel:
"""A singleton class for the chat model
Attributes:
_chat_model: the chat model instance
Methods:
instance: returns the chat model instance
"""
_chat_model = None
@staticmethod
def instance():
if ChatModel._chat_model is None:
ChatModel._chat_model = langchain_chat_models.ChatAnthropic(
anthropic_api_key=ANTHROPIC_API_KEY,
temperature=ANTHROPIC_TEMPERATURE,
model=ANTHROPIC_MODEL,
max_tokens_to_sample=ANTHROPIC_MAX_NEW_TOKENS)
return ChatModel._chat_model
anthropic_semaphore = asyncio.Semaphore(5)
@backoff.on_exception(backoff.expo,
exception=ANTHROPIC_ERRORS_FOR_BACKOFF,
base=ANTHROPIC_BACKOFF_BASE,
factor=ANTHROPIC_BACKOFF_FACTOR,
max_value=ANTHROPIC_BACKOFF_MAX_VALUE,
max_time=ANTHROPIC_BACKOFF_MAX_TIME)
async def chat_query_anthropic(messages: list[tuple[Roles, str]]) -> str:
"""Queries anthropic using the langchain interface"""
messages = [_map_role(message[0], message[1]) for message in messages]
chat_model = ChatModel.instance()
async with anthropic_semaphore:
response = await asyncio.wait_for(
chat_model.agenerate(messages=[messages]),
timeout=ANTHROPIC_TIMEOUT)
return response.generations[0][0].text
import langchain.embeddings.base as base_embeddings
import langchain.vectorstores.base as base_vc
import numpy as np
from langchain.docstore.document import Document
class NumpyVectorDB(base_vc.VectorStore):
"""Basic vector db implemented using numpy etc."""
def __init__(self, embeddings: base_embeddings.Embeddings,
embedding_dim: int) -> None:
self._embedder = embeddings
self._embedding_matrix: np.ndarray = np.zeros((0, embedding_dim))
self._keys: set[str] = set()
self._attr: dict[str, list] = {}
self._size: int = 0
self._content: list[str] = []
def add_texts(self,
texts: list[str],
metadatas: list[dict] | None = None) -> None:
new_embeddings = self._embedder.embed_documents(texts)
new_size = self._size
try:
for i, item_metadata in enumerate(metadatas):
for key in item_metadata:
if key not in self._keys:
self._keys.add(key)
self._attr[key] = [None] * new_size
self._attr[key] = self._attr[key] + [item_metadata[key]]
for key in self._keys:
if key not in item_metadata:
self._attr[key] = self._attr[key] + [None]
self._content.append(texts[i])
new_size += 1
self._embedding_matrix = np.concatenate(
[self._embedding_matrix, new_embeddings])
self._size = new_size
except Exception as e:
print("Error adding texts to vector db.")
for key in self._keys:
self._attr[key] = self._attr[key][:self._size]
self._content = self._content[:self._size]
self._embedding_matrix = self._embedding_matrix[:self._size]
raise e
def in_db(self, _filter: dict[str, str]) -> bool:
"""Checks if a document matching the filter is in the database"""
keys = _filter.keys()
for key in keys:
if key not in self._keys:
print("Key not in database.")
return False
one_hots = np.array([
np.equal(self._attr[key], _filter[key])
if key in self._keys else False for key in keys
])
# multiply one_hots together
if one_hots.size == 0:
print("No one_hots found.")
return False
one_hot = np.prod(one_hots, axis=0)
# check if any of the one_hots are 1
return np.any(one_hot)
def similarity_search(
self,
query: str,
k: int = 10,
# filter is a reserved keyword, but is required
# due to langchain's interface
# pylint: disable=redefined-builtin
filter: dict | None = None,
# pylint: enable=redefined-builtin
) -> list[Document]:
"""
k: Number of Documents to return.
Defaults to 4.
filter_: Attribute filter by metadata example {'key': 'value'}.
Defaults to None.
"""
query_embedding = self._embedder.embed_query(query)
distances = np.linalg.norm(self._embedding_matrix - query_embedding,
axis=1,
ord=2)
# # normalize
distances -= np.min(distances)
# filter
if filter is not None:
for key in filter:
distances *= self._attr[key] == filter[key]
# top k indices
if k >= len(distances):
sorted_indices = np.arange(len(distances))
else:
sorted_indices = np.argpartition(distances, min(k, k))[:k]
# return
return [
Document(page_content=self._content[i],
metadata={key: self._attr[key][i]
for key in self._keys})
for i in sorted_indices[:k]
]
@staticmethod
def from_texts(**kwargs):
raise NotImplementedError
EMBEDDING_DIM = 1536
import langchain.docstore.document as lc_document_models
import langchain.embeddings as lc_embeddings
import langchain.embeddings.base as base_embeddings
import langchain.text_splitter as lc_text_splitter
embeddings = lc_embeddings.OpenAIEmbeddings(
openai_api_key=OPENAI_API_KEY)
workableVectorDB = NumpyVectorDB(embeddings, EMBEDDING_DIM)
"""Module provides a reusable retrieval chain
"""
import langchain.docstore.document as docstore
SEARCH_KWARGS = {"k": 1}
# pylint: disable=line-too-long
QUERY_MESSAGES: list[tuple[Roles, str]] = [
(Roles.HUMAN, "Hello"),
(Roles.SYSTEM, ""),
(Roles.AI,
"Hi I am Mnemosyne, a question answering system built by Glyphic. " +
"I have access to a series of sales calls from a deal and can retrieve the most relevant "
+
"parts of the call for you, and then answer the question. What would you like to know?"
),
(Roles.HUMAN, "Great let me think about that for a second.")
]
# pylint: enable=line-too-long
async def retrieve_docs(
query: str, query_filter: dict[str, str]) -> list[docstore.Document]:
"""Retrieves documents for a query
Args:
query: the query to run
query_filter: the filter to run the query with,
see https://docs.activeloop.ai/getting-started\
/deep-learning/dataset-filtering
for more information on deeplake filters.
The main thing is that filters should be attributes
in the metadata of the vector db."""
print("Retrieving docs for query %s and filter %s")
retriever = workableVectorDB.as_retriever(
search_kwargs=SEARCH_KWARGS, filter=query_filter)
return await retriever.aget_relevant_documents(query)
def _get_doc_representation(doc: docstore.Document) -> str:
metadata = doc.metadata
content = doc.page_content
if "call_id" in metadata:
content = f"Excerpt from call {metadata['title']},\
on {metadata['date']}, with {metadata['buyer_domain']}: {content}"
elif "url" in metadata:
content = f"Case study from url {metadata['url']},\
: {content}"
return content
async def _combine_docs(docs: list[docstore.Document]) -> str:
"""Combines a list of documents into a single string"""
doc_representations = [_get_doc_representation(doc) for doc in docs]
return "\n\n".join(doc_representations)
async def answer_question(question: str, docs: str):
"""Answers a question given a query and a list of documents"""
messages = QUERY_MESSAGES.copy()
messages += [(Roles.HUMAN, question),
(Roles.SYSTEM,
f"Here are the documents I found:\n\n{docs}\n\n"),
(Roles.SYSTEM,
f"Now reply to the question: {question}.\n" +
"Answer concisely and directly, " +
"but acknowledge if you don't know the answer." +
"Some information may be from earlier parts of the deal " +
"and may be irrelevant to the question. " +
"The user will be unable to ask follow up questions.")]
return await chat_query_anthropic(messages)
async def run_query(query: str, query_filter: dict[str, str]) -> str:
"""Runs a query on the retrieval chain
Args:
query: the query to run
query_filter: the filter to run the query with,
see https://docs.activeloop.ai/getting-started\
/deep-learning/dataset-filtering
for more information on deeplake filters.
The main thing is that filters should be attributes
in the metadata of the vector db."""
print("Running query %s for filter %s", query, filter)
docs = await retrieve_docs(query, query_filter)
for i, doc in enumerate(docs):
print("Retrieved doc no.%d\n%s", i, doc.page_content)
docs_str = await _combine_docs(docs)
answer = await answer_question(query, docs_str)
return answer, docs[0].metadata["url"]
"""## Scraping"""
workable_urls = [
"https://resources.workable.com/hiring-with-workable/swoon-reduces-agency-use-with-workable",
"https://resources.workable.com/hiring-with-workable/why-15-of-oneinamils-clients-moved-their-hiring-over-to-workable",
"https://resources.workable.com/backstage/workable-named-top-rated-ats-by-trustradius-for-2019"
]
import requests
from bs4 import BeautifulSoup
import pprint
import numpy as np
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36"
}
PAGES = [
"https://resources.workable.com/tag/customer-stories/",
"https://resources.workable.com/tag/customer-stories/page/2/",
"https://resources.workable.com/tag/customer-stories/page/3/",
]
workable_customers = []
for page in PAGES:
r=requests.get(page, headers=headers)
soup = BeautifulSoup(r.content, 'html.parser')
for link in soup.find_all("a", href=True):
href = link["href"]
if href.startswith("https://resources.workable.com/hiring-with-workable/"):
workable_customers.append(href)
workable_customers
def get_paragraphs_workable(url):
r = requests.get(url=url, headers=headers)
soup = BeautifulSoup(r.content, 'html.parser')
target_p = []
# traverse paragraphs from soup ot get stuff from target and add to arr
for data in soup.find_all("p"):
text = data.get_text()
if len(text) > 3:
target_p.append(text.strip())
return target_p
def clean_text(text):
text = text.replace("\n\n", "\n")
text = text.replace("\t\t", "\t")
text = text.replace("\r", " ")
text = text.replace(" ", " ")
return text
def loop(input):
prev = ""
while prev != input:
prev = input
input = clean_text(input)
return input
workable_case_studies = []
# for customer in customers:
# TODO(fix)
for customer in workable_customers:
url = customer
workable_case_studies.append((url,loop('<join>'.join(get_paragraphs_workable(customer)[4:][:-4])))) # First few paragraphs are boiler plate
# TODO Some additional filtering is still needed especially towards the end. We should probably discard things that are not in the main body.
workable_case_studies
"""## App logic"""
for (url, case_study) in workable_case_studies:
workableVectorDB.add_texts([case_study], [{"url": url}])
import gradio as gr
import requests
import asyncio
API_KEY = os.environ.get("API_KEY")
def authenticate(api_key):
# Check if the provided API key matches the expected key
return api_key == API_KEY
def get_answer(question, api_key):
# Authenticate the API key
if not authenticate(api_key):
return "Invalid API key. Access denied."
# Send a POST request to the API endpoint
response = asyncio.run(run_query(question, query_filter={}))
return f"{response[0]} \n\nSource: {response[1]}"
# Create a Gradio interface
iface = gr.Interface(
fn=get_answer,
inputs=["text", gr.inputs.Textbox(label="API Key")],
outputs="text",
title="Question Answering App",
description="Enter a question and API key to get an answer.",
theme="default",
layout="vertical"
)
# Launch the Gradio interface
iface.launch()