import os import sys import time import pickle import numpy as np from utils import logger_creator, format_time #import together import warnings from together import Together from langchain.prompts import PromptTemplate from langchain.llms.base import LLM from langchain.chains import ConversationalRetrievalChain from langchain_core.output_parsers import StrOutputParser from langchain.text_splitter import RecursiveCharacterTextSplitter #from langchain.llms import HuggingFacePipeline from langchain_community.llms import HuggingFacePipeline #from langchain.embeddings import HuggingFaceEmbeddings #from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.document_loaders import TextLoader #from langchain.document_loaders import UnstructuredFileLoader from langchain_community.document_loaders import UnstructuredFileLoader from langchain_community.vectorstores import FAISS from langchain.utils import get_from_dict_or_env from langchain_core.runnables import RunnablePassthrough, RunnableParallel from pydantic.v1 import Extra, Field, root_validator from typing import Any, Dict, List, Mapping, Optional from langchain.memory import ConversationBufferMemory from langchain import LLMChain, PromptTemplate warnings.filterwarnings("ignore", category=UserWarning) B_INST, E_INST = "[INST]", "[/INST]" B_SYS, E_SYS = "<>\n", "\n<>\n\n" DEFAULT_SYSTEM_PROMPT = "" instruction = """You are an AI assistant designed for answering questions about a video. You are given a document and a question, the document records what people see and hear from this video. Try to connet these information and provide a conversational answer. Question: {question} ========= {context} ========= """ system_prompt = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question. You can assume the discussion is about the video content. Chat History: {chat_history} Follow Up Input: {question} Standalone question:""" def get_prompt(instruction, new_system_prompt=DEFAULT_SYSTEM_PROMPT ): SYSTEM_PROMPT = B_SYS + new_system_prompt + E_SYS prompt_template = B_INST + SYSTEM_PROMPT + instruction + E_INST return prompt_template template = get_prompt(instruction, system_prompt) prompt = PromptTemplate( input_variables=["chat_history", "user_input"], template=template ) class TogetherLLM(LLM): """Together large language models.""" model: str = "togethercomputer/llama-2-70b-chat" """model endpoint to use""" together_api_key: str = os.environ["TOGETHER_API_KEY"] """Together API key""" temperature: float = 0.7 """What sampling temperature to use.""" max_tokens: int = 512 """The maximum number of tokens to generate in the completion.""" class Config: extra = 'forbid' #Extra.forbid @root_validator() def validate_environment(cls, values: Dict) -> Dict: """Validate that the API key is set.""" api_key = get_from_dict_or_env( values, "together_api_key", "TOGETHER_API_KEY" ) values["together_api_key"] = api_key return values @property def _llm_type(self) -> str: """Return type of LLM.""" return "together" def _call( self, prompt: str, **kwargs: Any, ) -> str: """Call to Together endpoint.""" client = Together(api_key=self.together_api_key) messages = [ { "role": "user", "content": prompt }, { "role": "assistant", "content": "" } ] # Ensure the 'prompt' is passed as part of a structured 'messages' list output = client.chat.completions.create( model=self.model, messages=messages, max_tokens=512, temperature=0.7, top_p=0.7, top_k=50, repetition_penalty=1, stop=[""] ) text = output.choices[0].message.content print(f"Answer: {text}") return text class Vlogger4chat : def __init__(self, args): self.args = args self.alpha = args.alpha self.beta = args.beta self.data_dir = args.data_dir, self.tmp_dir = args.tmp_dir self.models_flag = False self.init_llm() self.history = [] self.my_embedding = HuggingFaceEmbeddings(model_name='BAAI/bge-m3', model_kwargs={'device': 'cpu'} ,encode_kwargs={'normalize_embeddings': True}) def init_llm(self): print('\033[1;33m' + "Initializing LLM Reasoner...".center(50, '-') + '\033[0m') self.llm = TogetherLLM( model= os.getenv("YOUR_MODEL_NAME"), temperature=0.1, max_tokens=768 ) print('\033[1;32m' + "LLM initialization finished!".center(50, '-') + '\033[0m') def exist_videolog(self, video_id): if isinstance(self.data_dir, tuple): self.data_dir = self.data_dir[0] # 或者根据实际情况选择合适的索引 if isinstance(video_id, tuple): video_id = video_id[0] # 或者根据实际情况选择合适的索引 log_path = os.path.join(self.data_dir, f"{video_id}.log") #print(f"log_path: {log_path}\n") if os.path.exists(log_path): #print("existing log path!!!\n") loader = UnstructuredFileLoader(log_path) raw_documents = loader.load() if not raw_documents: print("The log file is empty or could not be loaded.") return False # 如果 raw_documents 为空或所有内容都为空白,直接返回 # Split text text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) chunks = text_splitter.split_documents(raw_documents) self.vector_storage = FAISS.from_documents(chunks, self.my_embedding) self.chain = ConversationalRetrievalChain.from_llm(self.llm, self.vector_storage.as_retriever(), return_source_documents=True) return True return False def create_videolog(self, video_id): video_id = os.path.basename(self.video_path).split('.')[0] log_path = os.path.join(self.data_dir, video_id + '.log') loader = UnstructuredFileLoader(log_path) raw_documents = loader.load() # Split text text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) chunks = text_splitter.split_documents(raw_documents) self.vector_storage = FAISS.from_documents(chunks, self.my_embedding) self.chain = ConversationalRetrievalChain.from_llm(self.llm, self.vector_storage.as_retriever(), return_source_documents=True) def video2log(self, video_path): self.video_path = video_path video_id = os.path.basename(video_path).split('.')[0] if self.exist_videolog(video_id): return self.printlog(video_id) return self.printlog(video_id) def printlog(self, video_id): log_list = [] log_path = os.path.join(self.data_dir, video_id + '.log') with open(log_path, 'r', encoding='utf-8') as f: for line in f: log_list.append(line.strip()) return log_list def chat2video(self, question): print(f"Question: {question}") #response = self.chain({"question": "请用中文回答:"+question, "chat_history": self.history})['answer'] response = self.chain.invoke({"question": "请用中文回答:" + question, "chat_history": self.history})['answer'] self.history.append((question, response)) return response def clean_history(self): self.history = [] return