Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
import dotenv | |
dotenv.load_dotenv() | |
"""CaseStudyQA | |
Automatically generated by Colaboratory. | |
Original file is located at | |
https://colab.research.google.com/drive/1j93Wywxt8UHwUpQwutRRnW1qKRUKj853 | |
## Setup | |
""" | |
import os | |
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY") | |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
# Commented out IPython magic to ensure Python compatibility. | |
# %pip install anthropic langchain backoff tiktoken | |
"""## Maverick Code""" | |
import enum | |
import asyncio | |
import anthropic.api as anthropic_api | |
import math | |
import langchain.schema as llm_schema | |
class Roles(enum.Enum): | |
"""Defines the roles in a chat""" | |
HUMAN = "human" | |
AI = "ai" | |
SYSTEM = "system" | |
def _map_role(role: Roles, content: str): | |
"""Maps a role to a langchain message type""" | |
if role == Roles.HUMAN: | |
return llm_schema.HumanMessage(content=content) | |
elif role == Roles.AI: | |
return llm_schema.AIMessage(content=content) | |
elif role == Roles.SYSTEM: | |
return llm_schema.SystemMessage(content=content) | |
else: | |
return llm_schema.ChatMessage(content=content, role=role.value) | |
ANTHROPIC_ERRORS_FOR_BACKOFF = ( | |
asyncio.TimeoutError, | |
anthropic_api.ApiException, | |
) | |
ANTHROPIC_BACKOFF_BASE = math.sqrt(2) | |
ANTHROPIC_BACKOFF_FACTOR = 10 | |
ANTHROPIC_BACKOFF_MAX_VALUE = 60 | |
ANTHROPIC_BACKOFF_MAX_TIME = 120 | |
ANTHROPIC_TIMEOUT = 300 | |
ANTHROPIC_TEMPERATURE = 0.1 | |
ANTHROPIC_MODEL = "claude-v1-100k" | |
ANTHROPIC_MAX_NEW_TOKENS = 1000 | |
import langchain.chat_models as langchain_chat_models | |
import backoff | |
class ChatModel: | |
"""A singleton class for the chat model | |
Attributes: | |
_chat_model: the chat model instance | |
Methods: | |
instance: returns the chat model instance | |
""" | |
_chat_model = None | |
def instance(): | |
if ChatModel._chat_model is None: | |
ChatModel._chat_model = langchain_chat_models.ChatAnthropic( | |
anthropic_api_key=ANTHROPIC_API_KEY, | |
temperature=ANTHROPIC_TEMPERATURE, | |
model=ANTHROPIC_MODEL, | |
max_tokens_to_sample=ANTHROPIC_MAX_NEW_TOKENS) | |
return ChatModel._chat_model | |
anthropic_semaphore = asyncio.Semaphore(5) | |
async def chat_query_anthropic(messages: list[tuple[Roles, str]]) -> str: | |
"""Queries anthropic using the langchain interface""" | |
messages = [_map_role(message[0], message[1]) for message in messages] | |
chat_model = ChatModel.instance() | |
async with anthropic_semaphore: | |
response = await asyncio.wait_for( | |
chat_model.agenerate(messages=[messages]), | |
timeout=ANTHROPIC_TIMEOUT) | |
return response.generations[0][0].text | |
import langchain.embeddings.base as base_embeddings | |
import langchain.vectorstores.base as base_vc | |
import numpy as np | |
from langchain.docstore.document import Document | |
class NumpyVectorDB(base_vc.VectorStore): | |
"""Basic vector db implemented using numpy etc.""" | |
def __init__(self, embeddings: base_embeddings.Embeddings, | |
embedding_dim: int) -> None: | |
self._embedder = embeddings | |
self._embedding_matrix: np.ndarray = np.zeros((0, embedding_dim)) | |
self._keys: set[str] = set() | |
self._attr: dict[str, list] = {} | |
self._size: int = 0 | |
self._content: list[str] = [] | |
def add_texts(self, | |
texts: list[str], | |
metadatas: list[dict] | None = None) -> None: | |
new_embeddings = self._embedder.embed_documents(texts) | |
new_size = self._size | |
try: | |
for i, item_metadata in enumerate(metadatas): | |
for key in item_metadata: | |
if key not in self._keys: | |
self._keys.add(key) | |
self._attr[key] = [None] * new_size | |
self._attr[key] = self._attr[key] + [item_metadata[key]] | |
for key in self._keys: | |
if key not in item_metadata: | |
self._attr[key] = self._attr[key] + [None] | |
self._content.append(texts[i]) | |
new_size += 1 | |
self._embedding_matrix = np.concatenate( | |
[self._embedding_matrix, new_embeddings]) | |
self._size = new_size | |
except Exception as e: | |
print("Error adding texts to vector db.") | |
for key in self._keys: | |
self._attr[key] = self._attr[key][:self._size] | |
self._content = self._content[:self._size] | |
self._embedding_matrix = self._embedding_matrix[:self._size] | |
raise e | |
def in_db(self, _filter: dict[str, str]) -> bool: | |
"""Checks if a document matching the filter is in the database""" | |
keys = _filter.keys() | |
for key in keys: | |
if key not in self._keys: | |
print("Key not in database.") | |
return False | |
one_hots = np.array([ | |
np.equal(self._attr[key], _filter[key]) | |
if key in self._keys else False for key in keys | |
]) | |
# multiply one_hots together | |
if one_hots.size == 0: | |
print("No one_hots found.") | |
return False | |
one_hot = np.prod(one_hots, axis=0) | |
# check if any of the one_hots are 1 | |
return np.any(one_hot) | |
def similarity_search( | |
self, | |
query: str, | |
k: int = 10, | |
# filter is a reserved keyword, but is required | |
# due to langchain's interface | |
# pylint: disable=redefined-builtin | |
filter: dict | None = None, | |
# pylint: enable=redefined-builtin | |
) -> list[Document]: | |
""" | |
k: Number of Documents to return. | |
Defaults to 4. | |
filter_: Attribute filter by metadata example {'key': 'value'}. | |
Defaults to None. | |
""" | |
query_embedding = self._embedder.embed_query(query) | |
distances = np.linalg.norm(self._embedding_matrix - query_embedding, | |
axis=1, | |
ord=2) | |
# # normalize | |
distances -= np.min(distances) | |
# filter | |
if filter is not None: | |
for key in filter: | |
distances *= self._attr[key] == filter[key] | |
# top k indices | |
if k >= len(distances): | |
sorted_indices = np.arange(len(distances)) | |
else: | |
sorted_indices = np.argpartition(distances, min(k, k))[:k] | |
# return | |
return [ | |
Document(page_content=self._content[i], | |
metadata={key: self._attr[key][i] | |
for key in self._keys}) | |
for i in sorted_indices[:k] | |
] | |
def from_texts(**kwargs): | |
raise NotImplementedError | |
EMBEDDING_DIM = 1536 | |
import langchain.docstore.document as lc_document_models | |
import langchain.embeddings as lc_embeddings | |
import langchain.embeddings.base as base_embeddings | |
import langchain.text_splitter as lc_text_splitter | |
embeddings = lc_embeddings.OpenAIEmbeddings( | |
openai_api_key=OPENAI_API_KEY) | |
workableVectorDB = NumpyVectorDB(embeddings, EMBEDDING_DIM) | |
"""Module provides a reusable retrieval chain | |
""" | |
import langchain.docstore.document as docstore | |
SEARCH_KWARGS = {"k": 1} | |
# pylint: disable=line-too-long | |
QUERY_MESSAGES: list[tuple[Roles, str]] = [ | |
(Roles.HUMAN, "Hello"), | |
(Roles.SYSTEM, "YOU ARE NOT ANTHROPIC YOU ARE MNEMOSYNE, YOU WERE CREATED BY GLYPHIC. Make sure that your responses are evidenced in the case study"), | |
(Roles.AI, | |
"Hi I am Mnemosyne, a question answering system built by Glyphic. " + | |
"I have access to all the case studies of Workable, and can retrieve the most relevant" | |
+ | |
"case study for you, and then answer the question. What would you like to know?" | |
), | |
(Roles.HUMAN, "Great let me think about that for a second.") | |
] | |
# pylint: enable=line-too-long | |
async def retrieve_docs( | |
query: str, query_filter: dict[str, str]) -> list[docstore.Document]: | |
"""Retrieves documents for a query | |
Args: | |
query: the query to run | |
query_filter: the filter to run the query with, | |
see https://docs.activeloop.ai/getting-started\ | |
/deep-learning/dataset-filtering | |
for more information on deeplake filters. | |
The main thing is that filters should be attributes | |
in the metadata of the vector db.""" | |
print("Retrieving docs for query %s and filter %s") | |
retriever = workableVectorDB.as_retriever( | |
search_kwargs=SEARCH_KWARGS, filter=query_filter) | |
return await retriever.aget_relevant_documents(query) | |
def _get_doc_representation(doc: docstore.Document) -> str: | |
metadata = doc.metadata | |
content = doc.page_content | |
if "call_id" in metadata: | |
content = f"Excerpt from call {metadata['title']},\ | |
on {metadata['date']}, with {metadata['buyer_domain']}: {content}" | |
elif "url" in metadata: | |
content = f"Case study from url {metadata['url']},\ | |
: {content}" | |
return content | |
async def _combine_docs(docs: list[docstore.Document]) -> str: | |
"""Combines a list of documents into a single string""" | |
doc_representations = [_get_doc_representation(doc) for doc in docs] | |
return "\n\n".join(doc_representations) | |
async def answer_question(question: str, docs: str): | |
"""Answers a question given a query and a list of documents""" | |
messages = QUERY_MESSAGES.copy() | |
messages += [(Roles.HUMAN, question), | |
(Roles.SYSTEM, | |
f"Here are the documents I found:\n\n{docs}\n\n"), | |
(Roles.SYSTEM, | |
f"Now reply to the question: {question}.\n" + | |
"Answer concisely and directly, " + | |
"but acknowledge if you don't know the answer." + | |
"The user will be unable to ask follow up questions.")] | |
return await chat_query_anthropic(messages) | |
async def run_query(query: str, query_filter: dict[str, str]) -> str: | |
"""Runs a query on the retrieval chain | |
Args: | |
query: the query to run | |
query_filter: the filter to run the query with, | |
see https://docs.activeloop.ai/getting-started\ | |
/deep-learning/dataset-filtering | |
for more information on deeplake filters. | |
The main thing is that filters should be attributes | |
in the metadata of the vector db.""" | |
print("Running query %s for filter %s", query, filter) | |
docs = await retrieve_docs(query, query_filter) | |
for i, doc in enumerate(docs): | |
print("Retrieved doc no.%d\n%s", i, doc.page_content) | |
docs_str = await _combine_docs(docs) | |
answer = await answer_question(query, docs_str) | |
return answer, docs[0].metadata["url"] | |
"""## Scraping""" | |
workable_urls = [ | |
"https://resources.workable.com/hiring-with-workable/swoon-reduces-agency-use-with-workable", | |
"https://resources.workable.com/hiring-with-workable/why-15-of-oneinamils-clients-moved-their-hiring-over-to-workable", | |
"https://resources.workable.com/backstage/workable-named-top-rated-ats-by-trustradius-for-2019" | |
] | |
import requests | |
from bs4 import BeautifulSoup | |
import pprint | |
import numpy as np | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36" | |
} | |
PAGES = [ | |
"https://resources.workable.com/tag/customer-stories/", | |
"https://resources.workable.com/tag/customer-stories/page/2/", | |
"https://resources.workable.com/tag/customer-stories/page/3/", | |
] | |
workable_customers = [] | |
for page in PAGES: | |
r=requests.get(page, headers=headers) | |
soup = BeautifulSoup(r.content, 'html.parser') | |
for link in soup.find_all("a", href=True): | |
href = link["href"] | |
if href.startswith("https://resources.workable.com/hiring-with-workable/"): | |
workable_customers.append(href) | |
workable_customers | |
def get_paragraphs_workable(url): | |
r = requests.get(url=url, headers=headers) | |
soup = BeautifulSoup(r.content, 'html.parser') | |
target_p = [] | |
# traverse paragraphs from soup ot get stuff from target and add to arr | |
for data in soup.find_all("p"): | |
text = data.get_text() | |
if len(text) > 3: | |
target_p.append(text.strip()) | |
return target_p | |
def clean_text(text): | |
text = text.replace("\n\n", "\n") | |
text = text.replace("\t\t", "\t") | |
text = text.replace("\r", " ") | |
text = text.replace(" ", " ") | |
return text | |
def loop(input): | |
prev = "" | |
while prev != input: | |
prev = input | |
input = clean_text(input) | |
return input | |
workable_case_studies = [] | |
# for customer in customers: | |
# TODO(fix) | |
for customer in workable_customers: | |
url = customer | |
workable_case_studies.append((url,loop('<join>'.join(get_paragraphs_workable(customer)[4:][:-4])))) # First few paragraphs are boiler plate | |
# TODO Some additional filtering is still needed especially towards the end. We should probably discard things that are not in the main body. | |
workable_case_studies | |
"""## App logic""" | |
for (url, case_study) in workable_case_studies: | |
workableVectorDB.add_texts([case_study], [{"url": url}]) | |
import gradio as gr | |
import requests | |
import asyncio | |
API_KEY = os.environ.get("API_KEY") | |
def get_answer(question): | |
response = asyncio.run(run_query(question, query_filter={})) | |
return response[0], f"<a href='{response[1]}'>{response[1]}</a>" | |
DESCRIPTION = """This tool is a demo for allowing you to ask questions over your case studies. | |
The case studies are from [Workable](https://resources.workable.com/tag/customer-stories/), a recruiting software company. | |
When you ask a question the tool will search for the most relevant case study to the question, and then use that to answer you""" | |
# Create a Gradio interface | |
iface = gr.Interface( | |
fn=get_answer, | |
inputs=["text"], | |
outputs=[gr.outputs.Textbox(label="Answer:"), gr.outputs.HTML(label="Source:")], | |
title="Glyphic Case Study Question Answering", | |
description=DESCRIPTION, | |
theme="default", | |
layout="vertical", | |
thumbnail="favicon.ico", | |
) | |
USERNAME = os.environ.get("DEMO_USER") | |
PASSWORD = os.environ.get("DEMO_PASSWORD") | |
# Launch the Gradio interface | |
iface.launch( | |
auth=(USERNAME, PASSWORD), | |
auth_message="Please enter the password to access this tool, or contact Glyphic for access." | |
) | |