import g4f from webscout.AIutel import Optimizers from webscout.AIutel import Conversation from webscout.AIutel import AwesomePrompts from webscout.AIbase import Provider, AsyncProvider from webscout.AIutel import available_providers from typing import Any, AsyncGenerator g4f.debug.version_check = False working_providers = available_providers completion_allowed_models = [ "code-davinci-002", "text-ada-001", "text-babbage-001", "text-curie-001", "text-davinci-002", "text-davinci-003", ] default_models = { "completion": "text-davinci-003", "chat_completion": "gpt-3.5-turbo", } default_provider = "Koala" class AsyncGPT4FREE(AsyncProvider): def __init__( self, provider: str = default_provider, is_conversation: bool = True, auth: str = None, max_tokens: int = 600, model: str = None, ignore_working: bool = False, timeout: int = 30, intro: str = None, filepath: str = None, update_file: bool = True, proxies: dict = {}, history_offset: int = 10250, act: str = None, ): """Initialies GPT4FREE Args: provider (str, optional): gpt4free based provider name. Defaults to Koala. is_conversation (bool, optional): Flag for chatting conversationally. Defaults to True. auth (str, optional): Authentication value for the provider incase it needs. Defaults to None. max_tokens (int, optional): Maximum number of tokens to be generated upon completion. Defaults to 600. model (str, optional): LLM model name. Defaults to text-davinci-003|gpt-3.5-turbo. ignore_working (bool, optional): Ignore working status of the provider. Defaults to False. timeout (int, optional): Http request timeout. Defaults to 30. intro (str, optional): Conversation introductory prompt. Defaults to None. filepath (str, optional): Path to file containing conversation history. Defaults to None. update_file (bool, optional): Add new prompts and responses to the file. Defaults to True. proxies (dict, optional): Http request proxies. Defaults to {}. history_offset (int, optional): Limit conversation history to this number of last texts. Defaults to 10250. act (str|int, optional): Awesome prompt key or index. (Used as intro). Defaults to None. """ assert provider in available_providers, ( f"Provider '{provider}' is not yet supported. " f"Try others like {', '.join(available_providers)}" ) if model is None: model = default_models["chat_completion"] self.is_conversation = is_conversation self.max_tokens_to_sample = max_tokens self.stream_chunk_size = 64 self.timeout = timeout self.last_response = {} self.__available_optimizers = ( method for method in dir(Optimizers) if callable(getattr(Optimizers, method)) and not method.startswith("__") ) Conversation.intro = ( AwesomePrompts().get_act( act, raise_not_found=True, default=None, case_insensitive=True ) if act else intro or Conversation.intro ) self.conversation = Conversation( is_conversation, self.max_tokens_to_sample, filepath, update_file, ) self.conversation.history_offset = history_offset self.model = model self.provider = provider self.ignore_working = ignore_working self.auth = auth self.proxy = None if not proxies else list(proxies.values())[0] def __str__(self): return f"AsyncGPTFREE(provider={self.provider})" async def ask( self, prompt: str, stream: bool = False, raw: bool = False, optimizer: str = None, conversationally: bool = False, ) -> dict | AsyncGenerator: """Chat with AI asynchronously. Args: prompt (str): Prompt to be send. stream (bool, optional): Flag for streaming response. Defaults to False. raw (bool, optional): Stream back raw response as received. Defaults to False. optimizer (str, optional): Prompt optimizer name - `[code, shell_command]`. Defaults to None. conversationally (bool, optional): Chat conversationally when using optimizer. Defaults to False. Returns: dict|AsyncGenerator : ai content ```json { "text" : "How may I help you today?" } ``` """ conversation_prompt = self.conversation.gen_complete_prompt(prompt) if optimizer: if optimizer in self.__available_optimizers: conversation_prompt = getattr(Optimizers, optimizer)( conversation_prompt if conversationally else prompt ) else: raise Exception( f"Optimizer is not one of {self.__available_optimizers}" ) payload = dict( model=self.model, provider=self.provider, # g4f.Provider.Aichat, messages=[{"role": "user", "content": conversation_prompt}], stream=True, ignore_working=self.ignore_working, auth=self.auth, proxy=self.proxy, timeout=self.timeout, ) async def format_response(response): return dict(text=response) async def for_stream(): previous_chunks = "" response = g4f.ChatCompletion.create_async(**payload) async for chunk in response: previous_chunks += chunk formatted_resp = await format_response(previous_chunks) self.last_response.update(formatted_resp) yield previous_chunks if raw else formatted_resp self.conversation.update_chat_history( prompt, previous_chunks, ) async def for_non_stream(): async for _ in for_stream(): pass return self.last_response return for_stream() if stream else await for_non_stream() async def chat( self, prompt: str, stream: bool = False, optimizer: str = None, conversationally: bool = False, ) -> dict | AsyncGenerator: """Generate response `str` asynchronously. Args: prompt (str): Prompt to be send. stream (bool, optional): Flag for streaming response. Defaults to False. optimizer (str, optional): Prompt optimizer name - `[code, shell_command]`. Defaults to None. conversationally (bool, optional): Chat conversationally when using optimizer. Defaults to False. Returns: str|AsyncGenerator: Response generated """ async def for_stream(): async_ask = await self.ask( prompt, True, optimizer=optimizer, conversationally=conversationally ) async for response in async_ask: yield await self.get_message(response) async def for_non_stream(): return await self.get_message( await self.ask( prompt, False, optimizer=optimizer, conversationally=conversationally, ) ) return for_stream() if stream else await for_non_stream() async def get_message(self, response: dict) -> str: """Retrieves message only from response Args: response (dict): Response generated by `self.ask` Returns: str: Message extracted """ assert isinstance(response, dict), "Response should be of dict data-type only" return response["text"] class GPT4FREE(Provider): def __init__( self, provider: str = default_provider, is_conversation: bool = True, auth: str = None, max_tokens: int = 600, model: str = None, chat_completion: bool = True, ignore_working: bool = True, timeout: int = 30, intro: str = None, filepath: str = None, update_file: bool = True, proxies: dict = {}, history_offset: int = 10250, act: str = None, ): """Initialies GPT4FREE Args: provider (str, optional): gpt4free based provider name. Defaults to Koala. is_conversation (bool, optional): Flag for chatting conversationally. Defaults to True. auth (str, optional): Authentication value for the provider incase it needs. Defaults to None. max_tokens (int, optional): Maximum number of tokens to be generated upon completion. Defaults to 600. model (str, optional): LLM model name. Defaults to text-davinci-003|gpt-3.5-turbo. chat_completion(bool, optional): Provide native auto-contexting (conversationally). Defaults to False. ignore_working (bool, optional): Ignore working status of the provider. Defaults to False. timeout (int, optional): Http request timeout. Defaults to 30. intro (str, optional): Conversation introductory prompt. Defaults to None. filepath (str, optional): Path to file containing conversation history. Defaults to None. update_file (bool, optional): Add new prompts and responses to the file. Defaults to True. proxies (dict, optional): Http request proxies. Defaults to {}. history_offset (int, optional): Limit conversation history to this number of last texts. Defaults to 10250. act (str|int, optional): Awesome prompt key or index. (Used as intro). Defaults to None. """ assert provider in available_providers, ( f"Provider '{provider}' is not yet supported. " f"Try others like {', '.join(available_providers)}" ) if model is None: model = ( default_models["chat_completion"] if chat_completion else default_models["completion"] ) elif not chat_completion: assert model in completion_allowed_models, ( f"Model '{model}' is not yet supported for completion. " f"Try other models like {', '.join(completion_allowed_models)}" ) self.is_conversation = is_conversation self.max_tokens_to_sample = max_tokens self.stream_chunk_size = 64 self.timeout = timeout self.last_response = {} self.__available_optimizers = ( method for method in dir(Optimizers) if callable(getattr(Optimizers, method)) and not method.startswith("__") ) Conversation.intro = ( AwesomePrompts().get_act( act, raise_not_found=True, default=None, case_insensitive=True ) if act else intro or Conversation.intro ) self.conversation = Conversation( False if chat_completion else is_conversation, self.max_tokens_to_sample, filepath, update_file, ) self.conversation.history_offset = history_offset self.model = model self.provider = provider self.chat_completion = chat_completion self.ignore_working = ignore_working self.auth = auth self.proxy = None if not proxies else list(proxies.values())[0] self.__chat_class = g4f.ChatCompletion if chat_completion else g4f.Completion def ask( self, prompt: str, stream: bool = False, raw: bool = False, optimizer: str = None, conversationally: bool = False, ) -> dict: """Chat with AI Args: prompt (str): Prompt to be send. stream (bool, optional): Flag for streaming response. Defaults to False. raw (bool, optional): Stream back raw response as received. Defaults to False. optimizer (str, optional): Prompt optimizer name - `[code, shell_command]`. Defaults to None. conversationally (bool, optional): Chat conversationally when using optimizer. Defaults to False. Returns: dict : {} ```json { "text" : "How may I help you today?" } ``` """ conversation_prompt = self.conversation.gen_complete_prompt(prompt) if optimizer: if optimizer in self.__available_optimizers: conversation_prompt = getattr(Optimizers, optimizer)( conversation_prompt if conversationally else prompt ) else: raise Exception( f"Optimizer is not one of {self.__available_optimizers}" ) def payload(): if self.chat_completion: return dict( model=self.model, provider=self.provider, # g4f.Provider.Aichat, messages=[{"role": "user", "content": conversation_prompt}], stream=stream, ignore_working=self.ignore_working, auth=self.auth, proxy=self.proxy, timeout=self.timeout, ) else: return dict( model=self.model, prompt=conversation_prompt, provider=self.provider, stream=stream, ignore_working=self.ignore_working, auth=self.auth, proxy=self.proxy, timeout=self.timeout, ) def format_response(response): return dict(text=response) def for_stream(): previous_chunks = "" response = self.__chat_class.create(**payload()) for chunk in response: previous_chunks += chunk formatted_resp = format_response(previous_chunks) self.last_response.update(formatted_resp) yield previous_chunks if raw else formatted_resp self.conversation.update_chat_history( prompt, previous_chunks, ) def for_non_stream(): response = self.__chat_class.create(**payload()) formatted_resp = format_response(response) self.last_response.update(formatted_resp) self.conversation.update_chat_history(prompt, response) return response if raw else formatted_resp return for_stream() if stream else for_non_stream() def chat( self, prompt: str, stream: bool = False, optimizer: str = None, conversationally: bool = False, ) -> str: """Generate response `str` Args: prompt (str): Prompt to be send. stream (bool, optional): Flag for streaming response. Defaults to False. optimizer (str, optional): Prompt optimizer name - `[code, shell_command]`. Defaults to None. conversationally (bool, optional): Chat conversationally when using optimizer. Defaults to False. Returns: str: Response generated """ def for_stream(): for response in self.ask( prompt, True, optimizer=optimizer, conversationally=conversationally ): yield self.get_message(response) def for_non_stream(): return self.get_message( self.ask( prompt, False, optimizer=optimizer, conversationally=conversationally, ) ) return for_stream() if stream else for_non_stream() def get_message(self, response: dict) -> str: """Retrieves message only from response Args: response (dict): Response generated by `self.ask` Returns: str: Message extracted """ assert isinstance(response, dict), "Response should be of dict data-type only" return response["text"] from pathlib import Path from webscout.AIutel import default_path from json import dump, load from time import time from threading import Thread as thr from functools import wraps from rich.progress import Progress import logging results_path = Path(default_path) / "provider_test.json" def exception_handler(func): @wraps(func) def decorator(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: pass return decorator @exception_handler def is_working(provider: str) -> bool: """Test working status of a provider Args: provider (str): Provider name Returns: bool: is_working status """ bot = GPT4FREE(provider=provider, is_conversation=False) text = bot.chat("hello") assert isinstance(text, str) assert bool(text.strip()) assert " 2 return True class TestProviders: def __init__( self, test_at_once: int = 5, quiet: bool = False, timeout: int = 20, selenium: bool = False, do_log: bool = True, ): """Constructor Args: test_at_once (int, optional): Test n providers at once. Defaults to 5. quiet (bool, optinal): Disable stdout. Defaults to False. timout (int, optional): Thread timeout for each provider. Defaults to 20. selenium (bool, optional): Test even selenium dependent providers. Defaults to False. do_log (bool, optional): Flag to control logging. Defaults to True. """ self.test_at_once: int = test_at_once self.quiet = quiet self.timeout = timeout self.do_log = do_log self.__logger = logging.getLogger(__name__) self.working_providers: list = [ provider.__name__ for provider in g4f.Provider.__providers__ if provider.working ] if not selenium: import g4f.Provider.selenium as selenium_based from g4f import webdriver webdriver.has_requirements = False selenium_based_providers: list = dir(selenium_based) for provider in self.working_providers: try: selenium_based_providers.index(provider) except ValueError: pass else: self.__log( 10, f"Dropping provider - {provider} - [Selenium dependent]" ) self.working_providers.remove(provider) self.results_path: Path = results_path self.__create_empty_file(ignore_if_found=True) self.results_file_is_empty: bool = False def __log( self, level: int, message: str, ): """class logger""" if self.do_log: self.__logger.log(level, message) else: pass def __create_empty_file(self, ignore_if_found: bool = False): if ignore_if_found and self.results_path.is_file(): return with self.results_path.open("w") as fh: dump({"results": []}, fh) self.results_file_is_empty = True def test_provider(self, name: str): """Test each provider and save successful ones Args: name (str): Provider name """ try: bot = GPT4FREE(provider=name, is_conversation=False) start_time = time() text = bot.chat("hello there") assert isinstance(text, str), "Non-string response returned" assert bool(text.strip()), "Empty string" assert " 2 except Exception as e: pass else: self.results_file_is_empty = False with self.results_path.open() as fh: current_results = load(fh) new_result = dict(time=time() - start_time, name=name) current_results["results"].append(new_result) self.__log(20, f"Test result - {new_result['name']} - {new_result['time']}") with self.results_path.open("w") as fh: dump(current_results, fh) @exception_handler def main( self, ): self.__create_empty_file() threads = [] # Create a progress bar total = len(self.working_providers) with Progress() as progress: self.__log(20, f"Testing {total} providers : {self.working_providers}") task = progress.add_task( f"[cyan]Testing...[{self.test_at_once}]", total=total, visible=self.quiet == False, ) while not progress.finished: for count, provider in enumerate(self.working_providers, start=1): t1 = thr( target=self.test_provider, args=(provider,), ) t1.start() if count % self.test_at_once == 0 or count == len(provider): for t in threads: try: t.join(self.timeout) except Exception as e: pass threads.clear() else: threads.append(t1) progress.update(task, advance=1) def get_results(self, run: bool = False, best: bool = False) -> list[dict]: """Get test results Args: run (bool, optional): Run the test first. Defaults to False. best (bool, optional): Return name of the best provider. Defaults to False. Returns: list[dict]|str: Test results. """ if run or self.results_file_is_empty: self.main() with self.results_path.open() as fh: results: dict = load(fh) results = results["results"] if not results: if run: raise Exception("Unable to find working g4f provider") else: self.__log(30, "Hunting down working g4f providers.") return self.get_results(run=True, best=best) time_list = [] sorted_list = [] for entry in results: time_list.append(entry["time"]) time_list.sort() for time_value in time_list: for entry in results: if entry["time"] == time_value: sorted_list.append(entry) return sorted_list[0]["name"] if best else sorted_list @property def best(self): """Fastest provider overally""" return self.get_results(run=False, best=True) @property def auto(self): """Best working provider""" for result in self.get_results(run=False, best=False): self.__log(20, "Confirming working status of provider : " + result["name"]) if is_working(result["name"]): return result["name"]