import time import uuid from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait import click import requests from requests import get from uuid import uuid4 from re import findall from requests.exceptions import RequestException from curl_cffi.requests import get, RequestsError import g4f from random import randint from PIL import Image import io import re import json import yaml from ..AIutel import Optimizers from ..AIutel import Conversation from ..AIutel import AwesomePrompts, sanitize_stream from ..AIbase import Provider, AsyncProvider from Helpingai_T2 import Perplexity from webscout import exceptions from typing import Any, AsyncGenerator, Dict import logging import httpx #-------------------------------------------------------youchat-------------------------------------------------------- class YouChat(Provider): def __init__( self, is_conversation: bool = True, max_tokens: int = 600, timeout: int = 30, intro: str = None, filepath: str = None, update_file: bool = True, proxies: dict = {}, history_offset: int = 10250, act: str = None, ): self.session = requests.Session() self.is_conversation = is_conversation self.max_tokens_to_sample = max_tokens self.chat_endpoint = "https://you.com/api/streamingSearch" self.stream_chunk_size = 64 self.timeout = timeout self.last_response = {} self.payload = { "q": "", "page": 1, "count": 10, "safeSearch": "Off", "onShoppingPage": False, "mkt": "", "responseFilter": "WebPages,Translations,TimeZone,Computation,RelatedSearches", "domain": "youchat", "queryTraceId": uuid.uuid4(), "conversationTurnId": uuid.uuid4(), "pastChatLength": 0, "selectedChatMode": "default", "chat": "[]", } self.headers = { "cache-control": "no-cache", 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36', 'Referer': f'https://you.com/search?q={self.payload["q"]}&fromSearchBar=true&tbm=youchat&chatMode=default' } self.__available_optimizers = ( method for method in dir(Optimizers) if callable(getattr(Optimizers, method)) and not method.startswith("__") ) self.session.headers.update(self.headers) Conversation.intro = ( AwesomePrompts().get_act( act, raise_not_found=True, default=None, case_insensitive=True ) if act else intro or Conversation.intro ) self.conversation = Conversation( is_conversation, self.max_tokens_to_sample, filepath, update_file ) self.conversation.history_offset = history_offset self.session.proxies = proxies def ask( self, prompt: str, stream: bool = False, raw: bool = False, optimizer: str = None, conversationally: bool = False, ) -> dict: conversation_prompt = self.conversation.gen_complete_prompt(prompt) if optimizer: if optimizer in self.__available_optimizers: conversation_prompt = getattr(Optimizers, optimizer)( conversation_prompt if conversationally else prompt ) else: raise Exception( f"Optimizer is not one of {self.__available_optimizers}" ) self.session.headers.update(self.headers) self.session.headers.update( dict( cookie=f"safesearch_guest=Off; uuid_guest={str(uuid4())}", ) ) self.payload["q"] = prompt def for_stream(): response = self.session.get( self.chat_endpoint, params=self.payload, stream=True, timeout=self.timeout, ) if not response.ok: raise exceptions.FailedToGenerateResponseError( f"Failed to generate response - ({response.status_code}, {response.reason})" ) streaming_response = "" for line in response.iter_lines(decode_unicode=True, chunk_size=64): if line: modified_value = re.sub("data:", "", line) try: json_modified_value = json.loads(modified_value) if "youChatToken" in json_modified_value: streaming_response += json_modified_value["youChatToken"] if print: print(json_modified_value["youChatToken"], end="") except: continue self.last_response.update(dict(text=streaming_response)) self.conversation.update_chat_history( prompt, self.get_message(self.last_response) ) return streaming_response def for_non_stream(): for _ in for_stream(): pass return self.last_response return for_stream() if stream else for_non_stream() def chat( self, prompt: str, stream: bool = False, optimizer: str = None, conversationally: bool = False, ) -> str: """Generate response `str` Args: prompt (str): Prompt to be send. stream (bool, optional): Flag for streaming response. Defaults to False. optimizer (str, optional): Prompt optimizer name - `[code, shell_command]`. Defaults to None. conversationally (bool, optional): Chat conversationally when using optimizer. Defaults to False. Returns: str: Response generated """ def chat( self, prompt: str, stream: bool = False, optimizer: str = None, conversationally: bool = False, ) -> str: """Generate response `str` Args: prompt (str): Prompt to be send. stream (bool, optional): Flag for streaming response. Defaults to False. optimizer (str, optional): Prompt optimizer name - `[code, shell_command]`. Defaults to None. conversationally (bool, optional): Chat conversationally when using optimizer. Defaults to False. Returns: str: Response generated """ def for_stream(): for response in self.ask( prompt, True, optimizer=optimizer, conversationally=conversationally ): yield self.get_message(response) def for_non_stream(): return self.get_message( self.ask( prompt, False, optimizer=optimizer, conversationally=conversationally, ) ) return for_stream() if stream else for_non_stream() def get_message(self, response: dict) -> str: """Retrieves message only from response Args: response (dict): Response generated by `self.ask` Returns: str: Message extracted """ assert isinstance(response, dict), "Response should be of dict data-type only" return response["text"]