|
import time |
|
import uuid |
|
from selenium import webdriver |
|
from selenium.webdriver.chrome.options import Options |
|
from selenium.webdriver.common.by import By |
|
from selenium.webdriver.support import expected_conditions as EC |
|
from selenium.webdriver.support.ui import WebDriverWait |
|
import click |
|
import requests |
|
from requests import get |
|
from uuid import uuid4 |
|
from re import findall |
|
from requests.exceptions import RequestException |
|
from curl_cffi.requests import get, RequestsError |
|
import g4f |
|
from random import randint |
|
from PIL import Image |
|
import io |
|
import re |
|
import json |
|
import yaml |
|
from ..AIutel import Optimizers |
|
from ..AIutel import Conversation |
|
from ..AIutel import AwesomePrompts, sanitize_stream |
|
from ..AIbase import Provider, AsyncProvider |
|
from Helpingai_T2 import Perplexity |
|
from webscout import exceptions |
|
from typing import Any, AsyncGenerator, Dict |
|
import logging |
|
import httpx |
|
|
|
class REKA(Provider): |
|
def __init__( |
|
self, |
|
api_key: str, |
|
is_conversation: bool = True, |
|
max_tokens: int = 600, |
|
timeout: int = 30, |
|
intro: str = None, |
|
filepath: str = None, |
|
update_file: bool = True, |
|
proxies: dict = {}, |
|
history_offset: int = 10250, |
|
act: str = None, |
|
model: str = "reka-core", |
|
system_prompt: str = "Be Helpful and Friendly. Keep your response straightforward, short and concise", |
|
use_search_engine: bool = False, |
|
use_code_interpreter: bool = False, |
|
): |
|
"""Instantiates REKA |
|
|
|
Args: |
|
is_conversation (bool, optional): Flag for chatting conversationally. Defaults to True |
|
max_tokens (int, optional): Maximum number of tokens to be generated upon completion. Defaults to 600. |
|
timeout (int, optional): Http request timeout. Defaults to 30. |
|
intro (str, optional): Conversation introductory prompt. Defaults to None. |
|
filepath (str, optional): Path to file containing conversation history. Defaults to None. |
|
update_file (bool, optional): Add new prompts and responses to the file. Defaults to True. |
|
proxies (dict, optional): Http request proxies. Defaults to {}. |
|
history_offset (int, optional): Limit conversation history to this number of last texts. Defaults to 10250. |
|
act (str|int, optional): Awesome prompt key or index. (Used as intro). Defaults to None. |
|
model (str, optional): REKA model name. Defaults to "reka-core". |
|
system_prompt (str, optional): System prompt for REKA. Defaults to "Be Helpful and Friendly. Keep your response straightforward, short and concise". |
|
use_search_engine (bool, optional): Whether to use the search engine. Defaults to False. |
|
use_code_interpreter (bool, optional): Whether to use the code interpreter. Defaults to False. |
|
""" |
|
self.session = requests.Session() |
|
self.is_conversation = is_conversation |
|
self.max_tokens_to_sample = max_tokens |
|
self.api_endpoint = "https://chat.reka.ai/api/chat" |
|
self.stream_chunk_size = 64 |
|
self.timeout = timeout |
|
self.last_response = {} |
|
self.model = model |
|
self.system_prompt = system_prompt |
|
self.use_search_engine = use_search_engine |
|
self.use_code_interpreter = use_code_interpreter |
|
self.access_token = api_key |
|
self.headers = { |
|
"Authorization": f"Bearer {self.access_token}", |
|
} |
|
|
|
self.__available_optimizers = ( |
|
method |
|
for method in dir(Optimizers) |
|
if callable(getattr(Optimizers, method)) and not method.startswith("__") |
|
) |
|
self.session.headers.update(self.headers) |
|
Conversation.intro = ( |
|
AwesomePrompts().get_act( |
|
act, raise_not_found=True, default=None, case_insensitive=True |
|
) |
|
if act |
|
else intro or Conversation.intro |
|
) |
|
self.conversation = Conversation( |
|
is_conversation, self.max_tokens_to_sample, filepath, update_file |
|
) |
|
self.conversation.history_offset = history_offset |
|
self.session.proxies = proxies |
|
|
|
def ask( |
|
self, |
|
prompt: str, |
|
stream: bool = False, |
|
raw: bool = False, |
|
optimizer: str = None, |
|
conversationally: bool = False, |
|
) -> dict: |
|
"""Chat with AI |
|
|
|
Args: |
|
prompt (str): Prompt to be send. |
|
stream (bool, optional): Flag for streaming response. Defaults to False. |
|
raw (bool, optional): Stream back raw response as received. Defaults to False. |
|
optimizer (str, optional): Prompt optimizer name - `[code, shell_command]`. Defaults to None. |
|
conversationally (bool, optional): Chat conversationally when using optimizer. Defaults to False. |
|
Returns: |
|
dict : {} |
|
```json |
|
{ |
|
"text" : "How may I assist you today?" |
|
} |
|
``` |
|
""" |
|
conversation_prompt = self.conversation.gen_complete_prompt(prompt) |
|
if optimizer: |
|
if optimizer in self.__available_optimizers: |
|
conversation_prompt = getattr(Optimizers, optimizer)( |
|
conversation_prompt if conversationally else prompt |
|
) |
|
else: |
|
raise Exception( |
|
f"Optimizer is not one of {self.__available_optimizers}" |
|
) |
|
|
|
self.session.headers.update(self.headers) |
|
payload = { |
|
|
|
"conversation_history": [ |
|
{"type": "human", "text": f"## SYSTEM PROMPT: {self.system_prompt}\n\n## QUERY: {conversation_prompt}"}, |
|
], |
|
|
|
"stream": stream, |
|
"use_search_engine": self.use_search_engine, |
|
"use_code_interpreter": self.use_code_interpreter, |
|
"model_name": self.model, |
|
|
|
|
|
} |
|
|
|
def for_stream(): |
|
response = self.session.post(self.api_endpoint, json=payload, stream=True, timeout=self.timeout) |
|
if not response.ok: |
|
raise Exception( |
|
f"Failed to generate response - ({response.status_code}, {response.reason}) - {response.text}" |
|
) |
|
|
|
for value in response.iter_lines( |
|
decode_unicode=True, |
|
chunk_size=self.stream_chunk_size, |
|
): |
|
try: |
|
resp = json.loads(value) |
|
self.last_response.update(resp) |
|
yield value if raw else resp |
|
except json.decoder.JSONDecodeError: |
|
pass |
|
self.conversation.update_chat_history( |
|
prompt, self.get_message(self.last_response) |
|
) |
|
|
|
def for_non_stream(): |
|
|
|
for _ in for_stream(): |
|
pass |
|
return self.last_response |
|
|
|
return for_stream() if stream else for_non_stream() |
|
|
|
def chat( |
|
self, |
|
prompt: str, |
|
stream: bool = False, |
|
optimizer: str = None, |
|
conversationally: bool = False, |
|
) -> str: |
|
"""Generate response `str` |
|
Args: |
|
prompt (str): Prompt to be send. |
|
stream (bool, optional): Flag for streaming response. Defaults to False. |
|
optimizer (str, optional): Prompt optimizer name - `[code, shell_command]`. Defaults to None. |
|
conversationally (bool, optional): Chat conversationally when using optimizer. Defaults to False. |
|
Returns: |
|
str: Response generated |
|
""" |
|
|
|
def for_stream(): |
|
for response in self.ask( |
|
prompt, True, optimizer=optimizer, conversationally=conversationally |
|
): |
|
yield self.get_message(response) |
|
|
|
def for_non_stream(): |
|
return self.get_message( |
|
self.ask( |
|
prompt, |
|
False, |
|
optimizer=optimizer, |
|
conversationally=conversationally, |
|
) |
|
) |
|
|
|
return for_stream() if stream else for_non_stream() |
|
|
|
def get_message(self, response: dict) -> str: |
|
"""Retrieves message only from response |
|
|
|
Args: |
|
response (dict): Response generated by `self.ask` |
|
|
|
Returns: |
|
str: Message extracted |
|
""" |
|
assert isinstance(response, dict), "Response should be of dict data-type only" |
|
return response.get("text") |