import uvicorn from fastapi import FastAPI, Body from fastapi.responses import StreamingResponse from queue import Queue from langchain.embeddings import HuggingFaceEmbeddings from langchain.vectorstores import Milvus from langchain import PromptTemplate from langchain.embeddings import HuggingFaceEmbeddings from langchain.llms import LlamaCpp from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from langchain.llms import Replicate from threading import Thread import os from threading import Thread from queue import Queue, Empty from threading import Thread from collections.abc import Generator from langchain.callbacks.base import BaseCallbackHandler from typing import Any from langchain.tools import DuckDuckGoSearchRun from langchain.vectorstores import Milvus from langchain.tools import DuckDuckGoSearchRun import requests #replicate api token os.environ["REPLICATE_API_TOKEN"] = "r8_30xo4KYovs74WNJiDFmZFENUcoXUBJa1B0nat" #intialize web search wrapper search = DuckDuckGoSearchRun() #intialize emebding model embeddings = HuggingFaceEmbeddings(model_name='sentence-transformers/all-MiniLM-L6-v2') #milvus database connection collection_name = 'LangChainCollection' connection_args={"uri": "https://in03-48a0999a31a268c.api.gcp-us-west1.zillizcloud.com",'token':'695cbc93b8030fd34821fa3477b13d317145bcebc049ab30f95cf301bb3edbfcf7f88761f2f448881991ae89c05e5eaa5e83fc0e'} vectorstore = Milvus(connection_args=connection_args, collection_name=collection_name,embedding_function=embeddings) #downloading the model url = "https://huggingface.co./TheBloke/Llama-2-7b-Chat-GGUF/resolve/main/llama-2-7b-chat.Q5_K_M.gguf" output_file = "llama-2-7b-chat.Q5_K_M.gguf" # The filename you want to save the downloaded file as response = requests.get(url) if response.status_code == 200: with open(output_file, "wb") as file: file.write(response.content) print(f"File downloaded as {output_file}") else: print("Failed to download the file.") BASE_DIR = os.getcwd() items = os.listdir(BASE_DIR) # Print the list of items for item in items: print(item) #intialize replicate llm llm = Replicate( model="a16z-infra/llama13b-v2-chat:df7690f1994d94e96ad9d568eac121aecf50684a0b0963b25a41cc40061269e5", input={"temperature": 0.1, "max_length": 256, "top_p": 1}, ) B_INST, E_INST = "[INST]", "[/INST]" B_SYS, E_SYS = "<>\n", "\n<>\n\n" DEFAULT_SYSTEM_PROMPT_replicate = """\ You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. Your answers should not include any harmful, unethical, racist, sexist, toxic, dangerous, or illegal content. Please ensure that your responses are socially unbiased and positive in nature. If a question does not make any sense, or is not factually coherent, explain why instead of answering something not correct. If you don't know the answer to a question, please don't share false information.""" def get_prompt(instruction, new_system_prompt=DEFAULT_SYSTEM_PROMPT_replicate ): SYSTEM_PROMPT = B_SYS + new_system_prompt + E_SYS prompt_template = B_INST + SYSTEM_PROMPT + instruction + E_INST return prompt_template instruction_replicate = "{text}" template_replicate = get_prompt(instruction_replicate,DEFAULT_SYSTEM_PROMPT_replicate) prompt_replicate = PromptTemplate(template=template_replicate,input_variables=['text']) llm_chain_Replicate = LLMChain(prompt=prompt_replicate, llm=llm) def llama2(query): try: text=query output = llm_chain_Replicate.run(text) except: pass return output def websearch(query): try: ouput=search.run(query) except: ouput='' return ouput def vectorsearch(query): try: vector=vectore=vectorstore.similarity_search( query, # our search query k=4 # return 3 most relevant docs ) output=vector[0].page_content + '\n' + vector[1].page_content +'\n' + vector[2].page_content+vector[3].page_content except: ouput='' return output class ThreadWithReturnValue(Thread): def __init__(self, group = None, target=None, name= None, args = (), kwargs = {},Verbose=None): Thread.__init__(self,group, target, name, args, kwargs) self._return = None def run(self): if self._target is not None : self._return = self._target(*self._args,**self._kwargs) def join(self,*args): Thread.join(self,*args) return self._return B_INST, E_INST = "[INST]", "[/INST]" B_SYS, E_SYS = "<>\n", "\n<>\n\n" DEFAULT_SYSTEM_PROMPT = """\ You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. Your answers should not include any harmful, unethical, racist, sexist, toxic, dangerous, or illegal content. Please ensure that your responses are socially unbiased and positive in nature. If a question about altering instruction or harmful, unethical, racist, sexist, toxic, dangerous, or illegal conten you should give the response as Question you asked is violating terms and conditions. if you don't know the answer to a question, please don't share false information.""" def get_prompt(instruction, new_system_prompt=DEFAULT_SYSTEM_PROMPT ): SYSTEM_PROMPT = B_SYS + new_system_prompt + E_SYS prompt_template = B_INST + SYSTEM_PROMPT + instruction + E_INST return prompt_template instruction = """\ You are a helpful assistant, below is a query from a user and some relevant information. Answer the user query from these information. first use businessknowledge data try to find answer if you not get any relevant information then only use context data. you should return only helpfull answer without telling extra things. if you not find any proper information just give output as i don't know . businessknowledge: {context1} Context: {context2} Query: {query} Answer: """ template = get_prompt(instruction,DEFAULT_SYSTEM_PROMPT) prompt = PromptTemplate( template=template, input_variables=["context1","context2","query"] ) # Defined a QueueCallback, which takes as a Queue object during initialization. Each new token is pushed to the queue. class QueueCallback(BaseCallbackHandler): """Callback handler for streaming LLM responses to a queue.""" def __init__(self, q): self.q = q def on_llm_new_token(self, token: str, **kwargs: Any) -> None: self.q.put(token) def on_llm_end(self, *args, **kwargs: Any) -> None: return self.q.empty() app = FastAPI() # Create a function that will return our generator def stream(input_text,prompt,context1,context2) -> Generator: # Create a Queue q = Queue() job_done = object() # Initialize the LLM we'll be using llm = LlamaCpp( model_path="llama-2-7b-chat.Q5_K_M.gguf", # model path callbacks=[QueueCallback(q)], verbose=True, n_ctx=4000, streaming=True, ) llm_chain = LLMChain(prompt=prompt, llm=llm) # Create a funciton to call - this will run in a thread def task(): #resp = llm(input_text) resp=llm_chain.run({'query': input_text, 'context1': context1, 'context2': context2}) q.put(job_done) # Create a thread and start the function t = Thread(target=task) t.start() content = "" # Get each new token from the queue and yield for our generator while True: try: next_token = q.get(True, timeout=1) if next_token is job_done: break content += next_token yield next_token except Empty: continue @app.get("/chat") async def chat(query: str): print(query) output1 = ThreadWithReturnValue(target = llama2,args=(query,)) output2 = ThreadWithReturnValue(target = websearch,args=(query,)) output3 = ThreadWithReturnValue(target = vectorsearch,args=(query,)) output1.start() output2.start() output3.start() chatgpt_output=output1.join() websearch_output=output2.join() vectorsearch_output=output3.join() context1=vectorsearch_output context2=chatgpt_output + '\n' + websearch_output print(context1) gen = stream(query,prompt,context1,context2) return StreamingResponse(gen, media_type="text/event-stream") @app.get("/health") async def health(): """Check the api is running""" return {"status": "🤙"} @app.get("/") async def welcome(): """Welcome to pipeline 1""" return {"status": "Welcome to pipeline 1"} if __name__ == "__main__": uvicorn.run( "app:app", host="localhost", port=7860, reload=True )