import os from enum import Enum from typing import Callable, Tuple from langchain.agents.agent import AgentExecutor from langchain.agents.tools import BaseTool, Tool from typing import Optional from langchain.agents import load_tools from langchain.agents.tools import BaseTool from langchain.llms.base import BaseLLM from langchain.chat_models import ChatOpenAI import requests from bs4 import BeautifulSoup # import llama_index # from llama_index import GPTVectorStoreIndex # from llama_index.readers.database import DatabaseReader # from logger import logger from logger import logger class ToolScope(Enum): GLOBAL = "global" SESSION = "session" SessionGetter = Callable[[], Tuple[str, AgentExecutor]] openai_api_key = os.environ["OPENAI_API_KEY"] base_url = os.getenv("OPENAI_BASE_URL", "") # Optionally use base URL from env llm = ChatOpenAI(model_name="gpt-4", temperature=1.0, openai_api_key=openai_api_key, base_url=base_url) def tool( name: str, description: str, scope: ToolScope = ToolScope.GLOBAL, ): def decorator(func): func.name = name func.description = description func.is_tool = True func.scope = scope return func return decorator class ToolWrapper: def __init__(self, name: str, description: str, scope: ToolScope, func): self.name = name self.description = description self.scope = scope self.func = func def is_global(self) -> bool: return self.scope == ToolScope.GLOBAL def is_per_session(self) -> bool: return self.scope == ToolScope.SESSION def to_tool( self, get_session: SessionGetter = lambda: [], ) -> BaseTool: func = self.func if self.is_per_session(): func = lambda *args, **kwargs: self.func( *args, **kwargs, get_session=get_session ) return Tool( name=self.name, description=self.description, func=func, ) class BaseToolSet: def tool_wrappers(cls) -> list[ToolWrapper]: methods = [ getattr(cls, m) for m in dir(cls) if hasattr(getattr(cls, m), "is_tool") ] return [ToolWrapper(m.name, m.description, m.scope, m) for m in methods] class RequestsGet(BaseToolSet): @tool( name="Requests Get", description="A portal to the internet. " "Use this when you need to get specific content from a website." "Input should be a url (i.e. https://www.google.com)." "The output will be the text response of the GET request.", ) def get(self, url: str) -> str: """Run the tool.""" html = requests.get(url).text soup = BeautifulSoup(html) non_readable_tags = soup.find_all( ["script", "style", "header", "footer", "form"] ) for non_readable_tag in non_readable_tags: non_readable_tag.extract() content = soup.get_text("\n", strip=True) if len(content) > 300: content = content[:300] + "..." logger.debug( f"\nProcessed RequestsGet, Input Url: {url} " f"Output Contents: {content}" ) return content # class WineDB(BaseToolSet): # def __init__(self): # db = DatabaseReader( # scheme="postgresql", # Database Scheme # host=settings["WINEDB_HOST"], # Database Host # port="5432", # Database Port # user="alphadom", # Database User # password=settings["WINEDB_PASSWORD"], # Database Password # dbname="postgres", # Database Name # ) # self.columns = ["nameEn", "nameKo", "description"] # concat_columns = str(",'-',".join([f'"{i}"' for i in self.columns])) # query = f""" # SELECT # Concat({concat_columns}) # FROM wine # """ # documents = db.load_data(query=query) # self.index = GPTVectorStoreIndex(documents) # @tool( # name="Wine Recommendation", # description="A tool to recommend wines based on a user's input. " # "Inputs are necessary factors for wine recommendations, such as the user's mood today, side dishes to eat with wine, people to drink wine with, what things you want to do, the scent and taste of their favorite wine." # "The output will be a list of recommended wines." # "The tool is based on a database of wine reviews, which is stored in a database.", # ) # def recommend(self, query: str) -> str: # """Run the tool.""" # results = self.index.query(query) # wine = "\n".join( # [ # f"{i}:{j}" # for i, j in zip( # self.columns, results.source_nodes[0].source_text.split("-") # ) # ] # ) # output = results.response + "\n\n" + wine # logger.debug( # f"\nProcessed WineDB, Input Query: {query} " f"Output Wine: {wine}" # ) # return output class ExitConversation(BaseToolSet): @tool( name="Exit Conversation", description="A tool to exit the conversation. " "Use this when you want to exit the conversation. " "The input should be a message that the conversation is over.", scope=ToolScope.SESSION, ) def exit(self, message: str, get_session: SessionGetter) -> str: """Run the tool.""" _, executor = get_session() del executor logger.debug(f"\nProcessed ExitConversation.") return message class ToolsFactory: @staticmethod def from_toolset( toolset: BaseToolSet, only_global: Optional[bool] = False, only_per_session: Optional[bool] = False, get_session: SessionGetter = lambda: [], ) -> list[BaseTool]: tools = [] for wrapper in toolset.tool_wrappers(): if only_global and not wrapper.is_global(): continue if only_per_session and not wrapper.is_per_session(): continue tools.append(wrapper.to_tool(get_session=get_session)) return tools @staticmethod def create_global_tools( toolsets: list[BaseToolSet], ) -> list[BaseTool]: tools = [] for toolset in toolsets: tools.extend( ToolsFactory.from_toolset( toolset=toolset, only_global=True, ) ) return tools @staticmethod def create_per_session_tools( toolsets: list[BaseToolSet], get_session: SessionGetter = lambda: [], ) -> list[BaseTool]: tools = [] for toolset in toolsets: tools.extend( ToolsFactory.from_toolset( toolset=toolset, only_per_session=True, get_session=get_session, ) ) return tools @staticmethod def create_global_tools_from_names( toolnames: list[str], llm: Optional[BaseLLM], ) -> list[BaseTool]: return load_tools(toolnames, llm=llm, base_url=base_url) ##########################################+> # ##########################################+> SYS # import signal # from typing import Optional, Tuple # from ptrace.debugger import ( # NewProcessEvent, # ProcessExecution, # ProcessExit, # ProcessSignal, # PtraceDebugger, # PtraceProcess, # ) # from ptrace.func_call import FunctionCallOptions # from ptrace.syscall import PtraceSyscall # from ptrace.tools import signal_to_exitcode # class SyscallTimeoutException(Exception): # def __init__(self, pid: int, *args) -> None: # super().__init__(f"deadline exceeded while waiting syscall for {pid}", *args) # class SyscallTracer: # def __init__(self, pid: int): # self.debugger: PtraceDebugger = PtraceDebugger() # self.pid: int = pid # self.process: PtraceProcess = None # def is_waiting(self, syscall: PtraceSyscall) -> bool: # if syscall.name.startswith("wait"): # return True # return False # def attach(self): # self.process = self.debugger.addProcess(self.pid, False) # def detach(self): # self.process.detach() # self.debugger.quit() # def set_timer(self, timeout: int): # def handler(signum, frame): # raise SyscallTimeoutException(self.process.pid) # signal.signal(signal.SIGALRM, handler) # signal.alarm(timeout) # def reset_timer(self): # signal.alarm(0) # def wait_syscall_with_timeout(self, timeout: int): # self.set_timer(timeout) # self.process.waitSyscall() # self.reset_timer() # def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]: # self.process.syscall() # exitcode = None # reason = "" # while True: # if not self.debugger: # break # try: # self.wait_syscall_with_timeout(30) # except ProcessExit as event: # if event.exitcode is not None: # exitcode = event.exitcode # continue # except ProcessSignal as event: # event.process.syscall(event.signum) # exitcode = signal_to_exitcode(event.signum) # reason = event.reason # continue # except NewProcessEvent as event: # continue # except ProcessExecution as event: # continue # except Exception as e: # reason = str(e) # break # syscall = self.process.syscall_state.event( # FunctionCallOptions( # write_types=False, # write_argname=False, # string_max_length=300, # replace_socketcall=True, # write_address=False, # max_array_count=20, # ) # ) # self.process.syscall() # if syscall is None: # continue # if syscall.result: # continue # self.reset_timer() # return exitcode, reason ##########################################+> SYS CALL END ############### => st dout.py import os import time import subprocess from datetime import datetime from typing import Callable, Literal, Optional, Union, Tuple PipeType = Union[Literal["stdout"], Literal["stderr"]] class StdoutTracer: def __init__( self, process: subprocess.Popen, timeout: int = 30, interval: int = 0.1, on_output: Callable[[PipeType, str], None] = lambda: None, ): self.process: subprocess.Popen = process self.timeout: int = timeout self.interval: int = interval self.last_output: datetime = None self.on_output: Callable[[PipeType, str], None] = on_output def nonblock(self): os.set_blocking(self.process.stdout.fileno(), False) os.set_blocking(self.process.stderr.fileno(), False) def get_output(self, pipe: PipeType) -> str: output = None if pipe == "stdout": output = self.process.stdout.read() elif pipe == "stderr": output = self.process.stderr.read() if output: decoded = output.decode() self.on_output(pipe, decoded) self.last_output = datetime.now() return decoded return "" def last_output_passed(self, seconds: int) -> bool: return (datetime.now() - self.last_output).seconds > seconds def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]: self.nonblock() self.last_output = datetime.now() output = "" exitcode = None while True: new_stdout = self.get_output("stdout") if new_stdout: output += new_stdout new_stderr = self.get_output("stderr") if new_stderr: output += new_stderr if self.process.poll() is not None: exitcode = self.process.poll() break if self.last_output_passed(self.timeout): self.process.kill() break time.sleep(self.interval) return (exitcode, output) ################## => stdout end import os import subprocess import time from datetime import datetime from typing import Dict, List from swarms.utils.utils import ANSI, Color, Style # test class Terminal(BaseToolSet): def __init__(self): self.sessions: Dict[str, List[SyscallTracer]] = {} @tool( name="Terminal", description="Executes commands in a terminal." "If linux errno occurs, we have to solve the problem with the terminal. " "Input must be one valid command. " "Output will be any output from running that command.", scope=ToolScope.SESSION, ) def execute(self, commands: str, get_session: SessionGetter) -> str: session, _ = get_session() try: process = subprocess.Popen( commands, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) logger.info(ANSI("Realtime Terminal Output").to(Color.magenta()) + ": ") output = "" tracer = StdoutTracer( process, on_output=lambda p, o: logger.info( ANSI(p).to(Style.dim()) + " " + o.strip("\n") ), ) exitcode, output = tracer.wait_until_stop_or_exit() except Exception as e: output = str(e) logger.debug( f"\nProcessed Terminal, Input Commands: {commands} " f"Output Answer: {output}" ) return output # if __name__ == "__main__": # import time # o = Terminal().execute( # "sleep 1; echo 1; sleep 2; echo 2; sleep 3; echo 3; sleep 10;", # lambda: ("", None), # ) # print(o) # time.sleep(10) # see if timer has reset ###################=> EDITOR/VERIFY from pathlib import Path def verify(func): def wrapper(*args, **kwargs): try: filepath = args[0].filepath except AttributeError: raise Exception("This tool doesn't have filepath. Please check your code.") if not str(Path(filepath).resolve()).startswith(str(Path().resolve())): return "You can't access file outside of playground." return func(*args, **kwargs) return wrapper #=====================> EDITOR/END VERIFY ###### EDITOR/WRITE.PY """ write protocol: """ class WriteCommand: separator = "\n" def __init__(self, filepath: str, content: int): self.filepath: str = filepath self.content: str = content self.mode: str = "w" def with_mode(self, mode: str) -> "WriteCommand": self.mode = mode return self @verify def execute(self) -> str: dir_path = os.path.dirname(self.filepath) if dir_path: os.makedirs(dir_path, exist_ok=True) with open(self.filepath, self.mode) as f: f.write(self.content) return self.content @staticmethod def from_str(command: str) -> "WriteCommand": filepath = command.split(WriteCommand.separator)[0] return WriteCommand(filepath, command[len(filepath) + 1 :]) class CodeWriter: @staticmethod def write(command: str) -> str: return WriteCommand.from_str(command).with_mode("w").execute() @staticmethod def append(command: str) -> str: return WriteCommand.from_str(command).with_mode("a").execute() #================> END #============================> EDITOR/READ.PY """ read protocol: |- """ class Line: def __init__(self, content: str, line_number: int, depth: int): self.__content: str = content self.__line_number: int = line_number self.__depth: int = depth self.__children: List[Line] = [] def get_content(self) -> str: return self.__content def get_depth(self) -> int: return self.__depth def append_child(self, child: "Line") -> None: self.__children.append(child) def find_by_lte_depth(self, depth: int) -> List["Line"]: if self.__depth > depth: return [] lines: List[Line] = [self] for child in self.__children: lines += child.find_by_lte_depth(depth) return lines def find_by_content(self, content: str) -> List["Line"]: if content in self.__content: return [self] lines: List[Line] = [] for child in self.__children: lines += child.find_by_content(content) return lines def find_last_lines(self) -> List["Line"]: if len(self.__children) == 0: return [self] else: return [self, *self.__children[-1].find_last_lines()] def print(self, depth: int = 0) -> None: print(f"{' ' * depth}{self}", end="") for child in self.__children: child.print(depth + 1) def __repr__(self): return f"{self.__line_number}: {self.__content}" class CodeTree: def __init__(self): self.root: Line = Line("\n", -1, -1) def append(self, content: str, line_number: int) -> None: last_lines: List[Line] = self.root.find_last_lines() new_leading_spaces: int = self.__get_leading_spaces(content) previous_line: Line = self.root previous_leading_spaces: int = -1 for line in last_lines: leading_spaces = self.__get_leading_spaces(line.get_content()) if ( previous_leading_spaces < new_leading_spaces and new_leading_spaces <= leading_spaces ): break previous_line, previous_leading_spaces = line, leading_spaces new_line_depth: int = previous_line.get_depth() + 1 previous_line.append_child(Line(content, line_number, new_line_depth)) def find_from_root(self, depth: int) -> List[Line]: return self.root.find_by_lte_depth(depth) def find_from_parent(self, depth: int, parent_content: str) -> List[Line]: lines: List[Line] = self.root.find_by_content(parent_content) if len(lines) == 0: return [] parent = lines[0] return parent.find_by_lte_depth(depth + parent.get_depth()) def print(self): print("Code Tree:") print("=================================") self.root.print() print("=================================") def __get_leading_spaces(self, content: str) -> int: return len(content) - len(content.lstrip()) class ReadCommand: separator = "|" def __init__(self, filepath: str, start: int, end: int): self.filepath: str = filepath self.start: int = start self.end: int = end @verify def execute(self) -> str: with open(self.filepath, "r") as f: code = f.readlines() if self.start == self.end: code = code[self.start - 1] else: code = "".join(code[self.start - 1 : self.end]) return code @staticmethod def from_str(command: str) -> "ReadCommand": filepath, line = command.split(ReadCommand.separator) start, end = line.split("-") return ReadCommand(filepath, int(start), int(end)) class SummaryCommand: separator = "|" def __init__(self, filepath: str, depth: int, parent_content: Optional[str] = None): self.filepath: str = filepath self.depth: int = depth self.parent_content: Optional[str] = parent_content @verify def execute(self) -> str: with open(self.filepath, "r") as f: code = f.readlines() code_tree = CodeTree() for i, line in enumerate(code): if line.strip() != "": code_tree.append(line, i + 1) if self.parent_content is None: lines = code_tree.find_from_root(self.depth) else: lines = code_tree.find_from_parent(self.depth, self.parent_content) return "".join([str(line) for line in lines]) @staticmethod def from_str(command: str) -> "SummaryCommand": command_list: List[str] = command.split(SummaryCommand.separator) filepath: str = command_list[0] depth: int = int(command_list[1]) parent_content: str | None = command_list[2] if len(command_list) == 3 else None return SummaryCommand( filepath=filepath, depth=depth, parent_content=parent_content ) class CodeReader: @staticmethod def read(command: str) -> str: return ReadCommand.from_str(command).execute() @staticmethod def summary(command: str) -> str: return SummaryCommand.from_str(command).execute() # if __name__ == "__main__": # summary = CodeReader.summary("read.py|1|class ReadCommand:") # print(summary) #============================> EDITOR/READ.PY END #=================================> EDITOR/PATCH.PY """ patch protocol: |,|,| ---~~~+++===+++~~~--- |,|,| ---~~~+++===+++~~~--- ... ---~~~+++===+++~~~--- let say original code is: ``` import requests def crawl_news(keyword): url = f"https://www.google.com/search?q={keyword}+news" response = requests.get(url) news = [] for result in response: news.append(result.text) return news ``` and we want to change it to: ``` import requests from bs4 import BeautifulSoup def crawl_news(keyword): url = f"https://www.google.com/search?q={keyword}+news" html = requests.get(url).text soup = BeautifulSoup(html, "html.parser") news_results = soup.find_all("div", class_="BNeawe vvjwJb AP7Wnd") news_titles = [] for result in news_results: news_titles.append(result.text) return news_titles ``` then the command will be: test.py|2,1|2,1|from bs4 import BeautifulSoup ---~~~+++===+++~~~--- test.py|5,5|5,33|html = requests.get(url).text soup = BeautifulSoup(html, "html.parser") news_results = soup.find_all("div", class_="BNeawe vvjwJb AP7Wnd") ---~~~+++===+++~~~--- test.py|7,5|9,13|news_titles = [] for result in news_results: news_titles ---~~~+++===+++~~~--- test.py|11,16|11,16|_titles """ import re class Position: separator = "," def __init__(self, line: int, col: int): self.line: int = line self.col: int = col def __str__(self): return f"(Ln {self.line}, Col {self.col})" @staticmethod def from_str(pos: str) -> "Position": line, col = pos.split(Position.separator) return Position(int(line) - 1, int(col) - 1) class PatchCommand: separator = "|" def __init__(self, filepath: str, start: Position, end: Position, content: str): self.filepath: str = filepath self.start: Position = start self.end: Position = end self.content: str = content def read_lines(self) -> list[str]: with open(self.filepath, "r") as f: lines = f.readlines() return lines def write_lines(self, lines: list[str]) -> int: with open(self.filepath, "w") as f: f.writelines(lines) return sum([len(line) for line in lines]) @verify def execute(self) -> Tuple[int, int]: lines = self.read_lines() before = sum([len(line) for line in lines]) lines[self.start.line] = ( lines[self.start.line][: self.start.col] + self.content + lines[self.end.line][self.end.col :] ) lines = lines[: self.start.line + 1] + lines[self.end.line + 1 :] after = self.write_lines(lines) written = len(self.content) deleted = before - after + written return written, deleted @staticmethod def from_str(command: str) -> "PatchCommand": match = re.search( r"(.*)\|([0-9]*),([0-9]*)\|([0-9]*),([0-9]*)(\||\n)(.*)", command, re.DOTALL, ) filepath = match.group(1) start_line = match.group(2) start_col = match.group(3) end_line = match.group(4) end_col = match.group(5) content = match.group(7) return PatchCommand( filepath, Position.from_str(f"{start_line},{start_col}"), Position.from_str(f"{end_line},{end_col}"), content, ) class CodePatcher: separator = "\n---~~~+++===+++~~~---\n" @staticmethod def sort_commands(commands: list[PatchCommand]) -> list[PatchCommand]: return sorted(commands, key=lambda c: c.start.line, reverse=True) @staticmethod def patch(bulk_command: str) -> Tuple[int, int]: commands = [ PatchCommand.from_str(command) for command in bulk_command.split(CodePatcher.separator) if command != "" ] commands = CodePatcher.sort_commands(commands) written, deleted = 0, 0 for command in commands: if command: w, d = command.execute() written += w deleted += d return written, deleted # if __name__ == "__main__": # commands = """test.py|2,1|2,1|from bs4 import BeautifulSoup # ---~~~+++===+++~~~--- # test.py|5,5|5,33|html = requests.get(url).text # soup = BeautifulSoup(html, "html.parser") # news_results = soup.find_all("div", class_="BNeawe vvjwJb AP7Wnd") # ---~~~+++===+++~~~--- # test.py|7,5|9,13|news_titles = [] # for result in news_results: # news_titles # ---~~~+++===+++~~~--- # test.py|11,16|11,16|_titles # """ # example = """import requests # def crawl_news(keyword): # url = f"https://www.google.com/search?q={keyword}+news" # response = requests.get(url) # news = [] # for result in response: # news.append(result.text) # return news # """ # testfile = "test.py" # with open(testfile, "w") as f: # f.write(example) # patcher = CodePatcher() # written, deleted = patcher.patch(commands) # print(f"written: {written}, deleted: {deleted}") ####################### => EDITOR/PATCH.PY ###################### EDITOR// INIT.PY class CodeEditor(BaseToolSet): @tool( name="CodeEditor.READ", description="Read and understand code. " f"Input should be filename and line number group. ex. test.py|1-10 " "and the output will be code. ", ) def read(self, inputs: str) -> str: try: output = CodeReader.read(inputs) except Exception as e: output = str(e) logger.debug( f"\nProcessed CodeEditor.READ, Input Commands: {inputs} " f"Output Answer: {output}" ) return output @tool( name="CodeEditor.SUMMARY", description="Summary code. " "Read the code structured into a tree. " "If you set specific line, it will show the code from the specific line. " "Input should be filename, depth, and specific line if you want. ex. test.py|2 or test.py|3|print('hello world') " "and the output will be list of (line number: code). ", ) def summary(self, inputs: str) -> str: try: output = CodeReader.summary(inputs) except Exception as e: output = str(e) logger.debug( f"\nProcessed CodeEditor.SUMMARY, Input Commands: {inputs} " f"Output Answer: {output}" ) return output @tool( name="CodeEditor.APPEND", description="Append code to the existing file. " "If the code is completed, use the Terminal tool to execute it, if not, append the code through the this tool. " "Input should be filename and code to append. " "Input code must be the code that should be appended, NOT whole code. " "ex. test.py\nprint('hello world')\n " "and the output will be last 3 lines.", ) def append(self, inputs: str) -> str: try: code = CodeWriter.append(inputs) output = "Last 3 line was:\n" + "\n".join(code.split("\n")[-3:]) except Exception as e: output = str(e) logger.debug( f"\nProcessed CodeEditor.APPEND, Input: {inputs} " f"Output Answer: {output}" ) return output @tool( name="CodeEditor.WRITE", description="Write code to create a new tool. " "If the code is completed, use the Terminal tool to execute it, if not, append the code through the CodeEditor.APPEND tool. " "Input should be formatted like: " "\n\n\n" "Here is an example: " "test.py\nmessage = 'hello world'\nprint(message)\n" "\n" "The output will be last 3 lines you wrote.", ) def write(self, inputs: str) -> str: try: code = CodeWriter.write(inputs.lstrip()) output = "Last 3 line was:\n" + "\n".join(code.split("\n")[-3:]) except Exception as e: output = str(e) logger.debug( f"\nProcessed CodeEditor.WRITE, Input: {inputs} " f"Output Answer: {output}" ) return output # @tool( # name="CodeEditor.PATCH", # description="Patch the code to correct the error if an error occurs or to improve it. " # "Input is a list of patches. The patch is separated by {seperator}. ".format( # seperator=CodePatcher.separator.replace("\n", "\\n") # ) # + "Each patch has to be formatted like below.\n" # "|,|,|" # "Here is an example. If the original code is:\n" # "print('hello world')\n" # "and you want to change it to:\n" # "print('hi corca')\n" # "then the patch should be:\n" # "test.py|1,8|1,19|hi corca\n" # "Code between start and end will be replaced with new_code. " # "The output will be written/deleted bytes or error message. ", # ) def patch(self, patches: str) -> str: try: w, d = CodePatcher.patch(patches) output = f"successfully wrote {w}, deleted {d}" except Exception as e: output = str(e) logger.debug( f"\nProcessed CodeEditor.PATCH, Input Patch: {patches} " f"Output Answer: {output}" ) return output @tool( name="CodeEditor.DELETE", description="Delete code in file for a new start. " "Input should be filename." "ex. test.py " "Output will be success or error message.", ) def delete(self, inputs: str) -> str: try: with open(filepath, "w") as f: f.write("") output = "success" except Exception as e: output = str(e) logger.debug( f"\nProcessed CodeEditor.DELETE, Input filename: {inputs} " f"Output Answer: {output}" ) return output ###################### EDITOR// INIT.PY END ########################### MODELS import uuid import numpy as np import torch from diffusers import ( EulerAncestralDiscreteScheduler, StableDiffusionInpaintPipeline, StableDiffusionInstructPix2PixPipeline, StableDiffusionPipeline, ) from PIL import Image from transformers import ( BlipForQuestionAnswering, BlipProcessor, CLIPSegForImageSegmentation, CLIPSegProcessor, ) from swarms.utils.utils import get_new_image_name class MaskFormer(BaseToolSet): def __init__(self, device): print("Initializing MaskFormer to %s" % device) self.device = device self.processor = CLIPSegProcessor.from_pretrained("CIDAS/clipseg-rd64-refined") self.model = CLIPSegForImageSegmentation.from_pretrained( "CIDAS/clipseg-rd64-refined" ).to(device) def inference(self, image_path, text): threshold = 0.5 min_area = 0.02 padding = 20 original_image = Image.open(image_path) image = original_image.resize((512, 512)) inputs = self.processor( text=text, images=image, padding="max_length", return_tensors="pt" ).to(self.device) with torch.no_grad(): outputs = self.model(**inputs) mask = torch.sigmoid(outputs[0]).squeeze().cpu().numpy() > threshold area_ratio = len(np.argwhere(mask)) / (mask.shape[0] * mask.shape[1]) if area_ratio < min_area: return None true_indices = np.argwhere(mask) mask_array = np.zeros_like(mask, dtype=bool) for idx in true_indices: padded_slice = tuple( slice(max(0, i - padding), i + padding + 1) for i in idx ) mask_array[padded_slice] = True visual_mask = (mask_array * 255).astype(np.uint8) image_mask = Image.fromarray(visual_mask) return image_mask.resize(original_image.size) class ImageEditing(BaseToolSet): def __init__(self, device): print("Initializing ImageEditing to %s" % device) self.device = device self.mask_former = MaskFormer(device=self.device) self.revision = "fp16" if "cuda" in device else None self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 self.inpaint = StableDiffusionInpaintPipeline.from_pretrained( "runwayml/stable-diffusion-inpainting", revision=self.revision, torch_dtype=self.torch_dtype, ).to(device) @tool( name="Remove Something From The Photo", description="useful when you want to remove and object or something from the photo " "from its description or location. " "The input to this tool should be a comma separated string of two, " "representing the image_path and the object need to be removed. ", ) def inference_remove(self, inputs): image_path, to_be_removed_txt = inputs.split(",") return self.inference_replace(f"{image_path},{to_be_removed_txt},background") @tool( name="Replace Something From The Photo", description="useful when you want to replace an object from the object description or " "location with another object from its description. " "The input to this tool should be a comma separated string of three, " "representing the image_path, the object to be replaced, the object to be replaced with ", ) def inference_replace(self, inputs): image_path, to_be_replaced_txt, replace_with_txt = inputs.split(",") original_image = Image.open(image_path) original_size = original_image.size mask_image = self.mask_former.inference(image_path, to_be_replaced_txt) updated_image = self.inpaint( prompt=replace_with_txt, image=original_image.resize((512, 512)), mask_image=mask_image.resize((512, 512)), ).images[0] updated_image_path = get_new_image_name( image_path, func_name="replace-something" ) updated_image = updated_image.resize(original_size) updated_image.save(updated_image_path) logger.debug( f"\nProcessed ImageEditing, Input Image: {image_path}, Replace {to_be_replaced_txt} to {replace_with_txt}, " f"Output Image: {updated_image_path}" ) return updated_image_path class InstructPix2Pix(BaseToolSet): def __init__(self, device): print("Initializing InstructPix2Pix to %s" % device) self.device = device self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 self.pipe = StableDiffusionInstructPix2PixPipeline.from_pretrained( "timbrooks/instruct-pix2pix", safety_checker=None, torch_dtype=self.torch_dtype, ).to(device) self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config( self.pipe.scheduler.config ) @tool( name="Instruct Image Using Text", description="useful when you want to the style of the image to be like the text. " "like: make it look like a painting. or make it like a robot. " "The input to this tool should be a comma separated string of two, " "representing the image_path and the text. ", ) def inference(self, inputs): """Change style of image.""" logger.debug("===> Starting InstructPix2Pix Inference") image_path, text = inputs.split(",")[0], ",".join(inputs.split(",")[1:]) original_image = Image.open(image_path) image = self.pipe( text, image=original_image, num_inference_steps=40, image_guidance_scale=1.2 ).images[0] updated_image_path = get_new_image_name(image_path, func_name="pix2pix") image.save(updated_image_path) logger.debug( f"\nProcessed InstructPix2Pix, Input Image: {image_path}, Instruct Text: {text}, " f"Output Image: {updated_image_path}" ) return updated_image_path class Text2Image(BaseToolSet): def __init__(self, device): print("Initializing Text2Image to %s" % device) self.device = device self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 self.pipe = StableDiffusionPipeline.from_pretrained( "runwayml/stable-diffusion-v1-5", torch_dtype=self.torch_dtype ) self.pipe.to(device) self.a_prompt = "best quality, extremely detailed" self.n_prompt = ( "longbody, lowres, bad anatomy, bad hands, missing fingers, extra digit, " "fewer digits, cropped, worst quality, low quality" ) @tool( name="Generate Image From User Input Text", description="useful when you want to generate an image from a user input text and save it to a file. " "like: generate an image of an object or something, or generate an image that includes some objects. " "The input to this tool should be a string, representing the text used to generate image. ", ) def inference(self, text): image_filename = os.path.join("image", str(uuid.uuid4())[0:8] + ".png") prompt = text + ", " + self.a_prompt image = self.pipe(prompt, negative_prompt=self.n_prompt).images[0] image.save(image_filename) logger.debug( f"\nProcessed Text2Image, Input Text: {text}, Output Image: {image_filename}" ) return image_filename class VisualQuestionAnswering(BaseToolSet): def __init__(self, device): print("Initializing VisualQuestionAnswering to %s" % device) self.torch_dtype = torch.float16 if "cuda" in device else torch.float32 self.device = device self.processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base") self.model = BlipForQuestionAnswering.from_pretrained( "Salesforce/blip-vqa-base", torch_dtype=self.torch_dtype ).to(self.device) @tool( name="Answer Question About The Image", description="useful when you need an answer for a question based on an image. " "like: what is the background color of the last image, how many cats in this figure, what is in this figure. " "The input to this tool should be a comma separated string of two, representing the image_path and the question", ) def inference(self, inputs): image_path, question = inputs.split(",") raw_image = Image.open(image_path).convert("RGB") inputs = self.processor(raw_image, question, return_tensors="pt").to( self.device, self.torch_dtype ) out = self.model.generate(**inputs) answer = self.processor.decode(out[0], skip_special_tokens=True) logger.debug( f"\nProcessed VisualQuestionAnswering, Input Image: {image_path}, Input Question: {question}, " f"Output Answer: {answer}" ) return answer #segment anything: ########################### MODELS # #########==========================> # from selenium import webdriver # from langchain.tools import BaseTool # class BrowserActionTool(BaseTool): # name = "browser_action" # description = "Perform a browser action." # prompt = """ # Sure, here are few-shot prompts for each of the browser tools: # 1. **Go To URL Tool** # Prompt: "Navigate to the OpenAI homepage." # Command: `{ "action_type": "go_to", "url": "https://www.openai.com" }` # 2. **Form Submission Tool** # Prompt: "On the page 'https://www.formexample.com', find the form with the id 'login', set the 'username' field to 'testuser', and the 'password' field to 'testpassword', then submit the form." # Command: `{ "action_type": "submit_form", "form_id": "login", "form_values": { "username": "testuser", "password": "testpassword" } }` # 3. **Click Link Tool** # Prompt: "On the current page, find the link with the text 'About Us' and click it." # Command: `{ "action_type": "click_link", "link_text": "About Us" }` # 4. **Enter Text Tool** # Prompt: "On the page 'https://www.textentryexample.com', find the text area with the id 'message' and enter the text 'Hello World'." # Command: `{ "action_type": "enter_text", "text_area_id": "message", "text": "Hello World" }` # 5. **Button Click Tool** # Prompt: "On the current page, find the button with the id 'submit' and click it." # Command: `{ "action_type": "click_button", "button_id": "submit" }` # 6. **Select Option Tool** # Prompt: "On the page 'https://www.selectoptionexample.com', find the select dropdown with the id 'country' and select the option 'United States'." # Command: `{ "action_type": "select_option", "select_id": "country", "option": "United States" }` # 7. **Hover Tool** # Prompt: "On the current page, find the element with the id 'menu' and hover over it." # Command: `{ "action_type": "hover", "element_id": "menu" }` # 8. **Scroll Tool** # Prompt: "On the current page, scroll down to the element with the id 'footer'." # Command: `{ "action_type": "scroll", "element_id": "footer" }` # 9. **Screenshot Tool** # Prompt: "On the current page, take a screenshot." # Command: `{ "action_type": "screenshot" }` # 10. **Back Navigation Tool** # Prompt: "Navigate back to the previous page." # Command: `{ "action_type": "back" }` # """ # def _run(self, action_type: str, action_details: dict) -> str: # """Perform a browser action based on action_type and action_details.""" # try: # driver = webdriver.Firefox() # if action_type == 'Open Browser': # pass # Browser is already opened # elif action_type == 'Close Browser': # driver.quit() # elif action_type == 'Navigate To URL': # driver.get(action_details['url']) # elif action_type == 'Fill Form': # for field_name, field_value in action_details['fields'].items(): # element = driver.find_element_by_name(field_name) # element.send_keys(field_value) # elif action_type == 'Submit Form': # element = driver.find_element_by_name(action_details['form_name']) # element.submit() # elif action_type == 'Click Button': # element = driver.find_element_by_name(action_details['button_name']) # element.click() # elif action_type == 'Scroll Down': # driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") # elif action_type == 'Scroll Up': # driver.execute_script("window.scrollTo(0, 0);") # elif action_type == 'Go Back': # driver.back() # elif action_type == 'Go Forward': # driver.forward() # elif action_type == 'Refresh': # driver.refresh() # elif action_type == 'Execute Javascript': # driver.execute_script(action_details['script']) # elif action_type == 'Switch Tab': # driver.switch_to.window(driver.window_handles[action_details['tab_index']]) # elif action_type == 'Close Tab': # driver.close() # else: # return f"Error: Unknown action type {action_type}." # return f"Action {action_type} completed successfully." # except Exception as e: # return f"Error: {e}" #--------------------------------------> END #--------------------------------------> AUTO GPT TOOLS # General import os import pandas as pd from langchain.agents.agent_toolkits.pandas.base import create_pandas_dataframe_agent from langchain.docstore.document import Document import asyncio import nest_asyncio # Tools from contextlib import contextmanager from typing import Optional from langchain.agents import tool from langchain.tools.file_management.read import ReadFileTool from langchain.tools.file_management.write import WriteFileTool ROOT_DIR = "./data/" from langchain.tools import BaseTool, DuckDuckGoSearchRun from langchain.text_splitter import RecursiveCharacterTextSplitter from pydantic import Field from langchain.chains.qa_with_sources.loading import load_qa_with_sources_chain, BaseCombineDocumentsChain @contextmanager def pushd(new_dir): """Context manager for changing the current working directory.""" prev_dir = os.getcwd() os.chdir(new_dir) try: yield finally: os.chdir(prev_dir) @tool def process_csv( csv_file_path: str, instructions: str, output_path: Optional[str] = None ) -> str: """Process a CSV by with pandas in a limited REPL.\ Only use this after writing data to disk as a csv file.\ Any figures must be saved to disk to be viewed by the human.\ Instructions should be written in natural language, not code. Assume the dataframe is already loaded.""" with pushd(ROOT_DIR): try: df = pd.read_csv(csv_file_path) except Exception as e: return f"Error: {e}" agent = create_pandas_dataframe_agent(llm, df, max_iterations=30, verbose=True) if output_path is not None: instructions += f" Save output to disk at {output_path}" try: result = agent.run(instructions) return result except Exception as e: return f"Error: {e}" async def async_load_playwright(url: str) -> str: """Load the specified URLs using Playwright and parse using BeautifulSoup.""" from bs4 import BeautifulSoup from playwright.async_api import async_playwright results = "" async with async_playwright() as p: browser = await p.chromium.launch(headless=True) try: page = await browser.new_page() await page.goto(url) page_source = await page.content() soup = BeautifulSoup(page_source, "html.parser") for script in soup(["script", "style"]): script.extract() text = soup.get_text() lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) results = "\n".join(chunk for chunk in chunks if chunk) except Exception as e: results = f"Error: {e}" await browser.close() return results def run_async(coro): event_loop = asyncio.get_event_loop() return event_loop.run_until_complete(coro) @tool def browse_web_page(url: str) -> str: """Verbose way to scrape a whole webpage. Likely to cause issues parsing.""" return run_async(async_load_playwright(url)) def _get_text_splitter(): return RecursiveCharacterTextSplitter( # Set a really small chunk size, just to show. chunk_size = 500, chunk_overlap = 20, length_function = len, ) class WebpageQATool(BaseTool): name = "query_webpage" description = "Browse a webpage and retrieve the information relevant to the question." text_splitter: RecursiveCharacterTextSplitter = Field(default_factory=_get_text_splitter) qa_chain: BaseCombineDocumentsChain def _run(self, url: str, question: str) -> str: """Useful for browsing websites and scraping the text information.""" result = browse_web_page.run(url) docs = [Document(page_content=result, metadata={"source": url})] web_docs = self.text_splitter.split_documents(docs) results = [] # TODO: Handle this with a MapReduceChain for i in range(0, len(web_docs), 4): input_docs = web_docs[i:i+4] window_result = self.qa_chain({"input_documents": input_docs, "question": question}, return_only_outputs=True) results.append(f"Response from window {i} - {window_result}") results_docs = [Document(page_content="\n".join(results), metadata={"source": url})] return self.qa_chain({"input_documents": results_docs, "question": question}, return_only_outputs=True) async def _arun(self, url: str, question: str) -> str: raise NotImplementedError query_website_tool = WebpageQATool(qa_chain=load_qa_with_sources_chain(llm)) # !pip install duckduckgo_search web_search = DuckDuckGoSearchRun() ######################################################## zapier # get from https://platform.openai.com/ # os.environ["OPENAI_API_KEY"] = os.environ.get("OPENAI_API_KEY", "") # # get from https://nla.zapier.com/demo/provider/debug (under User Information, after logging in): # os.environ["ZAPIER_NLA_API_KEY"] = os.environ.get("ZAPIER_NLA_API_KEY", "") # from langchain.agents.agent_toolkits import ZapierToolkit # from langchain.agents import AgentType # from langchain.utilities.zapier import ZapierNLAWrapper # zapier = ZapierNLAWrapper() # zapier_toolkit = ZapierToolkit.from_zapier_nla_wrapper(zapier) # # agent = initialize_agent( # # toolkit.get_tools(), llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True # # ) ######################################################## zapier end ######################################################## youtube search # from langchain.tools import YouTubeSearchTool # youtube_tool = YouTubeSearchTool() # #tool.run("lex friedman") ######################################################## youtube search end ######################################################## wolfram beginning # import os # os.environ["WOLFRAM_ALPHA_APPID"] = "" # from langchain.utilities.wolfram_alpha import WolframAlphaAPIWrapper # wolfram_tool = WolframAlphaAPIWrapper() # #wolfram.run("What is 2x+5 = -3x + 7?") ######################################################## wolfram end ######################################################## Wikipedia beginning # from langchain.utilities import WikipediaAPIWrapper # wikipedia_tool = WikipediaAPIWrapper() # #wikipedia.run("HUNTER X HUNTER") ######################################################## Wikipedia beginning ######################################################## search tools beginning # google_serpe_tools = load_tools(["google-serper"]) ######################################################## search tools end ######################################################## requests # from langchain.agents import load_tools # requests_tools = load_tools(["requests_all"]) # # requests_tools # requests_tools[0].requests_wrapper # from langchain.utilities import TextRequestsWrapper # requests = TextRequestsWrapper() # requests.get("https://www.google.com") ######################################################## requests ######################################################## pubmed # from langchain.tools import PubmedQueryRun # pubmed_tool = PubmedQueryRun() # pubmed_tool.run("chatgpt") ######################################################## pubmed emd ######################################################## IFTTT WebHooks # from langchain.tools.ifttt import IFTTTWebhook # import os # key = os.environ["IFTTTKey"] # url = f"https://maker.ifttt.com/trigger/spotify/json/with/key/{key}" # IFFT_tool = IFTTTWebhook( # name="Spotify", description="Add a song to spotify playlist", url=url # ) ######################################################## IFTTT WebHooks end ######################################################## huggingface # from langchain.agents import load_huggingface_tool # hf_tool = load_huggingface_tool("lysandre/hf-model-downloads") # print(f"{tool.name}: {tool.description}") ######################################################## huggingface end ######################################################## graphql # from langchain import OpenAI # from langchain.agents import load_tools, initialize_agent, AgentType # from langchain.utilities import GraphQLAPIWrapper # llm = OpenAI(temperature=0) # graphql_tool = load_tools( # ["graphql"], # graphql_endpoint="https://swapi-graphql.netlify.app/.netlify/functions/index" # ) # agent = initialize_agent( # tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True # ) ######################################################## graphql end ######################################################## graphql # from langchain.agents import initialize_agent # from langchain.llms import OpenAI # from gradio_tools.tools import ( # StableDiffusionTool, # ImageCaptioningTool, # StableDiffusionPromptGeneratorTool, # TextToVideoTool, # ) # from langchain.memory import ConversationBufferMemory # hf_model_tools = [ # StableDiffusionTool().langchain, # ImageCaptioningTool().langchain, # StableDiffusionPromptGeneratorTool().langchain, # TextToVideoTool().langchain, # ] ######################## ######################################################## graphql end ######################## ######################################################## file system from langchain.tools.file_management import ( ReadFileTool, CopyFileTool, DeleteFileTool, MoveFileTool, WriteFileTool, ListDirectoryTool, ) from langchain.agents.agent_toolkits import FileManagementToolkit from tempfile import TemporaryDirectory # We'll make a temporary directory to avoid clutter working_directory = TemporaryDirectory() toolkit = FileManagementToolkit( root_dir=str(working_directory.name) ) # If you don't provide a root_dir, operations will default to the current working directory toolkit.get_tools() file_management_tools = FileManagementToolkit( root_dir=str(working_directory.name), selected_tools=["read_file", "write_file", "list_directory"], ).get_tools() read_tool, write_tool, list_tool = file_management_tools write_tool.run({"file_path": "example.txt", "text": "Hello World!"}) # List files in the working directory list_tool.run({}) ######################### BRAVE # from langchain.tools import BraveSearch # brave_api_key = os.environ["BRAVE_API_KEY"] # brave_tool = BraveSearch.from_api_key(api_key=brave_api_key, search_kwargs={"count": 3}) ######################### BRAVE END ######################### ARXVIV # from langchain.chat_models import ChatOpenAI # from langchain.agents import load_tools, initialize_agent, AgentType # arxviv_tool = load_tools( # ["arxiv"], # ) # ############ # from langchain.utilities import ArxivAPIWrapper # arxiv_tool = ArxivAPIWrapper() # ################################# GMAIL TOOKKIT # from langchain.agents.agent_toolkits import GmailToolkit # gmail_toolkit = GmailToolkit() # from langchain.tools.gmail.utils import build_resource_service, get_gmail_credentials # # Can review scopes here https://developers.google.com/gmail/api/auth/scopes # # For instance, readonly scope is 'https://www.googleapis.com/auth/gmail.readonly' # credentials = get_gmail_credentials( # token_file="token.json", # scopes=["https://mail.google.com/"], # client_secrets_file="credentials.json", # ) # api_resource = build_resource_service(credentials=credentials) # gmail_toolkit_2 = GmailToolkit(api_resource=api_resource) # gmail_tools = toolkit.get_tools() # from langchain import OpenAI # from langchain.agents import initialize_agent, AgentType # agent = initialize_agent( # tools=toolkit.get_tools(), # llm=llm, # agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, # ) ################################# GMAIL TOOKKIT JSON AGENT # import os # import yaml # from langchain.agents import create_json_agent, AgentExecutor # from langchain.agents.agent_toolkits import JsonToolkit # from langchain.chains import LLMChain # from langchain.llms.openai import OpenAI # from langchain.requests import TextRequestsWrapper # from langchain.tools.json.tool import JsonSpec # with open("openai_openapi.yml") as f: # data = yaml.load(f, Loader=yaml.FullLoader) # json_spec = JsonSpec(dict_=data, max_value_length=4000) # json_toolkit = JsonToolkit(spec=json_spec) # json_agent_executor = create_json_agent( # llm=OpenAI(temperature=0), toolkit=json_toolkit, verbose=True # ) # json_agent_executor.run( # "What are the required parameters in the request body to the /completions endpoint?" # ) # ################################# OFFICE 365 TOOLKIT # from langchain.agents.agent_toolkits import O365Toolkit # threesixfive_toolkit = O365Toolkit() # threesixfive_toolkit = toolkit.get_tools() ################################# OFFICE 365 TOOLKIT END # import os, yaml # wget https://raw.githubusercontent.com/openai/openai-openapi/master/openapi.yaml # mv openapi.yaml openai_openapi.yaml # wget https://www.klarna.com/us/shopping/public/openai/v0/api-docs # mv api-docs klarna_openapi.yaml # wget https://raw.githubusercontent.com/APIs-guru/openapi-directory/main/APIs/spotify.com/1.0.0/openapi.yaml # mv openapi.yaml spotify_openapi.yaml # from langchain.agents.agent_toolkits.openapi.spec import reduce_openapi_spec # with open("openai_openapi.yaml") as f: # raw_openai_api_spec = yaml.load(f, Loader=yaml.Loader) # openai_api_spec = reduce_openapi_spec(raw_openai_api_spec) # with open("klarna_openapi.yaml") as f: # raw_klarna_api_spec = yaml.load(f, Loader=yaml.Loader) # klarna_api_spec = reduce_openapi_spec(raw_klarna_api_spec) # with open("spotify_openapi.yaml") as f: # raw_spotify_api_spec = yaml.load(f, Loader=yaml.Loader) # spotify_api_spec = reduce_openapi_spec(raw_spotify_api_spec) # import spotipy.util as util # from langchain.requests import RequestsWrapper # def construct_spotify_auth_headers(raw_spec: dict): # scopes = list( # raw_spec["components"]["securitySchemes"]["oauth_2_0"]["flows"][ # "authorizationCode" # ]["scopes"].keys() # ) # access_token = util.prompt_for_user_token(scope=",".join(scopes)) # return {"Authorization": f"Bearer {access_token}"} # # Get API credentials. # headers = construct_spotify_auth_headers(raw_spotify_api_spec) # requests_wrapper = RequestsWrapper(headers=headers) # endpoints = [ # (route, operation) # for route, operations in raw_spotify_api_spec["paths"].items() # for operation in operations # if operation in ["get", "post"] # ] # len(endpoints) # import tiktoken # enc = tiktoken.encoding_for_model("text-davinci-003") # def count_tokens(s): # return len(enc.encode(s)) # count_tokens(yaml.dump(raw_spotify_api_spec)) # from langchain.llms.openai import OpenAI # from langchain.agents.agent_toolkits.openapi import planner # llm = OpenAI(model_name="gpt-4", temperature=0.0, openai_api_key=openai_api_key) # spotify_agent = planner.create_openapi_agent(spotify_api_spec, requests_wrapper, llm) # user_query = ( # "make me a playlist with the first song from kind of blue. call it machine blues." # ) # spotify_agent.run(user_query) # from langchain.agents import create_openapi_agent # from langchain.agents.agent_toolkits import OpenAPIToolkit # from langchain.llms.openai import OpenAI # from langchain.requests import TextRequestsWrapper # from langchain.tools.json.tool import JsonSpec # with open("openai_openapi.yaml") as f: # data = yaml.load(f, Loader=yaml.FullLoader) # json_spec = JsonSpec(dict_=data, max_value_length=4000) # openapi_toolkit = OpenAPIToolkit.from_llm( # OpenAI(temperature=0), json_spec, openai_requests_wrapper, verbose=True # ) # openapi_agent_executor = create_openapi_agent( # llm=OpenAI(temperature=0), toolkit=openapi_toolkit, verbose=True # ) ############################################ Natural Language APIs start # from typing import List, Optional # from langchain.chains import LLMChain # from langchain.llms import OpenAI # from langchain.prompts import PromptTemplate # from langchain.requests import Requests # from langchain.tools import APIOperation, OpenAPISpec # from langchain.agents import AgentType, Tool, initialize_agent # from langchain.agents.agent_toolkits import NLAToolkit # # Select the LLM to use. Here, we use text-davinci-003 # llm = OpenAI( # temperature=0, max_tokens=700, openai_api_key=openai_api_key # ) # You can swap between different core LLM's here. # speak_toolkit = NLAToolkit.from_llm_and_url(llm, "https://api.speak.com/openapi.yaml") # klarna_toolkit = NLAToolkit.from_llm_and_url( # llm, "https://www.klarna.com/us/shopping/public/openai/v0/api-docs/" # ) # # Slightly tweak the instructions from the default agent # openapi_format_instructions = """Use the following format: # Question: the input question you must answer # Thought: you should always think about what to do # Action: the action to take, should be one of [{tool_names}] # Action Input: what to instruct the AI Action representative. # Observation: The Agent's response # ... (this Thought/Action/Action Input/Observation can repeat N times) # Thought: I now know the final answer. User can't see any of my observations, API responses, links, or tools. # Final Answer: the final answer to the original input question with the right amount of detail # When responding with your Final Answer, remember that the person you are responding to CANNOT see any of your Thought/Action/Action Input/Observations, so if there is any relevant information there you need to include it explicitly in your response.""" # natural_language_tools = speak_toolkit.get_tools() + klarna_toolkit.get_tools() # mrkl = initialize_agent( # natural_language_tools, # llm, # agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, # verbose=True, # agent_kwargs={"format_instructions": openapi_format_instructions}, # ) # mrkl.run( # "I have an end of year party for my Italian class and have to buy some Italian clothes for it" # ) # spoonacular_api = os.environ["SPOONACULAR_KEY"] # spoonacular_api_key = spoonacular_api # requests = Requests(headers={"x-api-key": spoonacular_api_key}) # spoonacular_toolkit = NLAToolkit.from_llm_and_url( # llm, # "https://spoonacular.com/application/frontend/downloads/spoonacular-openapi-3.json", # requests=requests, # max_text_length=1800, # If you want to truncate the response text # ) # natural_language_api_tools = ( # speak_toolkit.get_tools() # + klarna_toolkit.get_tools() # + spoonacular_toolkit.get_tools()[:30] # ) # print(f"{len(natural_language_api_tools)} tools loaded.") # natural_language_api_tools[1].run( # "Tell the LangChain audience to 'enjoy the meal' in Italian, please!" # ) ############################################ Natural Language APIs start END ############################################ python tool # from langchain.agents.agent_toolkits import create_python_agent # from langchain.tools.python.tool import PythonREPLTool # from langchain.python import PythonREPL # from langchain.llms.openai import OpenAI # from langchain.agents.agent_types import AgentType # from langchain.chat_models import ChatOpenAI # #test # # PythonREPLTool() # python_repl_tool = PythonREPLTool() ############################################ python tool ############### VECTOR STORE CHROMA, MAKE OCEAN # from langchain.embeddings.openai import OpenAIEmbeddings # from langchain.vectorstores import Chroma # from langchain.text_splitter import CharacterTextSplitter # from langchain import OpenAI, VectorDBQA # llm = OpenAI(temperature=0, openai_api_key=openai_api_key) # from langchain.document_loaders import TextLoader # loader = TextLoader("../../../state_of_the_union.txt") # documents = loader.load() # text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) # texts = text_splitter.split_documents(documents) # embeddings = OpenAIEmbeddings() # state_of_union_store = Chroma.from_documents( # texts, embeddings, collection_name="state-of-union" # ) # from langchain.document_loaders import WebBaseLoader # loader = WebBaseLoader("https://beta.ruff.rs/docs/faq/") # docs = loader.load() # ruff_texts = text_splitter.split_documents(docs) # ruff_store = Chroma.from_documents(ruff_texts, embeddings, collection_name="ruff") # ############ Initialize Toolkit and Agent # from langchain.agents.agent_toolkits import ( # create_vectorstore_agent, # VectorStoreToolkit, # VectorStoreInfo, # ) # vectorstore_info = VectorStoreInfo( # name="state_of_union_address", # description="the most recent state of the Union adress", # vectorstore=state_of_union_store, # ) # vectorstore_toolkit = VectorStoreToolkit(vectorstore_info=vectorstore_info) # agent_executor = create_vectorstore_agent(llm=llm, toolkit=toolkit, verbose=True) ######################### Multiple Vectorstores #We can also easily use this initialize an agent with multiple vectorstores and use the agent to route between them. To do this. This agent is optimized for routing, so it is a different toolkit and initializer. # from langchain.agents.agent_toolkits import ( # create_vectorstore_router_agent, # VectorStoreRouterToolkit, # VectorStoreInfo, # ) # ruff_vectorstore_info = VectorStoreInfo( # name="ruff", # description="Information about the Ruff python linting library", # vectorstore=ruff_store, # ) # router_toolkit = VectorStoreRouterToolkit( # vectorstores=[vectorstore_info, ruff_vectorstore_info], llm=llm # ) # # ############################################### ===========================> Whisperx speech to text # import os # from pydantic import BaseModel, Field # from pydub import AudioSegment # from pytube import YouTube # import whisperx # from langchain.tools import tool # hf_api_key = os.environ["HF_API_KEY"] # # define a custom input schema for the youtube url # class YouTubeVideoInput(BaseModel): # video_url: str = Field(description="YouTube Video URL to transcribe") # def download_youtube_video(video_url, audio_format='mp3'): # audio_file = f'video.{audio_format}' # # Download video # yt = YouTube(video_url) # yt_stream = yt.streams.filter(only_audio=True).first() # yt_stream.download(filename='video.mp4') # # Convert video to audio # video = AudioSegment.from_file("video.mp4", format="mp4") # video.export(audio_file, format=audio_format) # os.remove("video.mp4") # return audio_file # @tool("transcribe_youtube_video", args_schema=YouTubeVideoInput, return_direct=True) # def transcribe_youtube_video(video_url: str) -> str: # """Transcribes a YouTube video.""" # audio_file = download_youtube_video(video_url) # device = "cuda" # batch_size = 16 # compute_type = "float16" # # 1. Transcribe with original Whisper (batched) # model = whisperx.load_model("large-v2", device, compute_type=compute_type) # audio = whisperx.load_audio(audio_file) # result = model.transcribe(audio, batch_size=batch_size) # # 2. Align Whisper output # model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device) # result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False) # # 3. Assign speaker labels # diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_api_key, device=device) # diarize_segments = diarize_model(audio_file) # try: # segments = result["segments"] # transcription = " ".join(segment['text'] for segment in segments) # return transcription # except KeyError: # print("The key 'segments' is not found in the result.") # ################################################### BASE WHISPER TOOL # from typing import Optional, Type # from pydantic import BaseModel, Field # from langchain.tools import BaseTool # from langchain.callbacks.manager import ( # AsyncCallbackManagerForToolRun, # CallbackManagerForToolRun, # ) # import requests # import whisperx # class AudioInput(BaseModel): # audio_file: str = Field(description="Path to audio file") # class TranscribeAudioTool(BaseTool): # name = "transcribe_audio" # description = "Transcribes an audio file using WhisperX" # args_schema: Type[AudioInput] = AudioInput # def _run( # self, # audio_file: str, # device: str = "cuda", # batch_size: int = 16, # compute_type: str = "float16", # run_manager: Optional[CallbackManagerForToolRun] = None, # ) -> str: # """Use the tool.""" # model = whisperx.load_model("large-v2", device, compute_type=compute_type) # audio = whisperx.load_audio(audio_file) # result = model.transcribe(audio, batch_size=batch_size) # model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device) # result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False) # diarize_model = whisperx.DiarizationPipeline(use_auth_token=hf_api_key, device=device) # diarize_segments = diarize_model(audio_file) # try: # segments = result["segments"] # transcription = " ".join(segment['text'] for segment in segments) # return transcription # except KeyError: # print("The key 'segments' is not found in the result.") # async def _arun( # self, # audio_file: str, # device: str = "cuda", # batch_size: int = 16, # compute_type: str = "float16", # run_manager: Optional[AsyncCallbackManagerForToolRun] = None, # ) -> str: # """Use the tool asynchronously.""" # raise NotImplementedError("transcribe_audio does not support async") ###########=========================> #======> Calculator # from langchain import LLMMathChain # llm_math_chain = LLMMathChain.from_llm(llm=llm, verbose=True) # math_tool = Tool( # name="Calculator", # func=llm_math_chain.run, # description="useful for when you need to answer questions about math" # ), # #####==========================================================================> TOOLS # from langchain.tools.human.tool import HumanInputRun # from langchain.tools import BaseTool, DuckDuckGoSearchRun