Spaces:
Runtime error
Runtime error
import time | |
import types | |
from typing import Any, Dict, List, Tuple, Union | |
from langchain.agents import AgentExecutor | |
from langchain.input import get_color_mapping | |
from langchain.schema import AgentAction, AgentFinish | |
from .translator import Translator | |
class AgentExecutorWithTranslation(AgentExecutor): | |
translator: Translator = Translator() | |
def prep_outputs( | |
self, | |
inputs: Dict[str, str], | |
outputs: Dict[str, str], | |
return_only_outputs: bool = False, | |
) -> Dict[str, str]: | |
try: | |
outputs = super().prep_outputs(inputs, outputs, return_only_outputs) | |
except ValueError as e: | |
return outputs | |
else: | |
if "input" in outputs: | |
outputs = self.translator(outputs) | |
return outputs | |
class Executor(AgentExecutorWithTranslation): | |
def _call(self, inputs: Dict[str, str]) -> Dict[str, Any]: | |
"""Run text through and get agent response.""" | |
# Construct a mapping of tool name to tool for easy lookup | |
name_to_tool_map = {tool.name: tool for tool in self.tools} | |
# We construct a mapping from each tool to a color, used for logging. | |
color_mapping = get_color_mapping( | |
[tool.name for tool in self.tools], excluded_colors=["green"] | |
) | |
intermediate_steps: List[Tuple[AgentAction, str]] = [] | |
# Let's start tracking the iterations the agent has gone through | |
iterations = 0 | |
time_elapsed = 0.0 | |
start_time = time.time() | |
# We now enter the agent loop (until it returns something). | |
while self._should_continue(iterations, time_elapsed): | |
next_step_output = self._take_next_step( | |
name_to_tool_map, color_mapping, inputs, intermediate_steps | |
) | |
if isinstance(next_step_output, AgentFinish): | |
yield self._return(next_step_output, intermediate_steps) | |
return | |
for i, output in enumerate(next_step_output): | |
agent_action = output[0] | |
tool_logo = None | |
for tool in self.tools: | |
if tool.name == agent_action.tool: | |
tool_logo = tool.tool_logo_md | |
if isinstance(output[1], types.GeneratorType): | |
logo = f"{tool_logo}" if tool_logo is not None else "" | |
yield ( | |
AgentAction("", agent_action.tool_input, agent_action.log), | |
f"Further use other tool {logo} to answer the question.", | |
) | |
for out in output[1]: | |
yield out | |
next_step_output[i] = (agent_action, out) | |
else: | |
for tool in self.tools: | |
if tool.name == agent_action.tool: | |
yield ( | |
AgentAction( | |
tool_logo, agent_action.tool_input, agent_action.log | |
), | |
output[1], | |
) | |
intermediate_steps.extend(next_step_output) | |
if len(next_step_output) == 1: | |
next_step_action = next_step_output[0] | |
# See if tool should return directly | |
tool_return = self._get_tool_return(next_step_action) | |
if tool_return is not None: | |
yield self._return(tool_return, intermediate_steps) | |
return | |
iterations += 1 | |
time_elapsed = time.time() - start_time | |
output = self.agent.return_stopped_response( | |
self.early_stopping_method, intermediate_steps, **inputs | |
) | |
yield self._return(output, intermediate_steps) | |
return | |
def __call__( | |
self, inputs: Union[Dict[str, Any], Any], return_only_outputs: bool = False | |
) -> Dict[str, Any]: | |
"""Run the logic of this chain and add to output if desired. | |
Args: | |
inputs: Dictionary of inputs, or single input if chain expects | |
only one param. | |
return_only_outputs: boolean for whether to return only outputs in the | |
response. If True, only new keys generated by this chain will be | |
returned. If False, both input keys and new keys generated by this | |
chain will be returned. Defaults to False. | |
""" | |
inputs = self.prep_inputs(inputs) | |
self.callback_manager.on_chain_start( | |
{"name": self.__class__.__name__}, | |
inputs, | |
verbose=self.verbose, | |
) | |
try: | |
for output in self._call(inputs): | |
if type(output) is dict: | |
output = self.prep_outputs(inputs, output, return_only_outputs) | |
yield output | |
except (KeyboardInterrupt, Exception) as e: | |
self.callback_manager.on_chain_error(e, verbose=self.verbose) | |
raise e | |
self.callback_manager.on_chain_end(output, verbose=self.verbose) | |
# return self.prep_outputs(inputs, output, return_only_outputs) | |
return output | |