# -*- coding: utf-8 -*- """wiki_chat.ipynb Automatically generated by Colaboratory. Original file is located at https://colab.research.google.com/drive/1P5rJeCXRSsDJw_1ksnHmodH6ng2Ot5NW """ # !pip install gradio # !pip install -U sentence-transformers # !pip install datasets from azure_utils import AzureVoiceData from polly_utils import PollyVoiceData, NEURAL_ENGINE from langchain.prompts import PromptTemplate from openai.error import AuthenticationError, InvalidRequestError, RateLimitError import re import sys from io import StringIO from threading import Lock from langchain.llms import OpenAI from langchain.chains.conversation.memory import ConversationBufferMemory from langchain.agents import tool, load_tools, initialize_agent from langchain import ConversationChain, LLMChain import whisper import warnings import boto3 import datetime from typing import Optional, Tuple from contextlib import closing # Console to variable import io import requests import os import gradio as gr from sentence_transformers import SentenceTransformer, CrossEncoder, util #from torch import tensor as torch_tensor #from datasets import load_dataset from greg_funcs import mrkl_rspnd, get_cot OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") # AWS keys AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") aws_region_name = "us-east-1" os.environ["AWS_DEFAULT_REGION"] = aws_region_name # exhumana api key # todo: may need to pay to get one os.environ['EXHUMAN_API_KEY'] = '' # XXX remove, we are not using the talking head because it costs money and doesnt work. # news, tmdb keys os.environ["NEWS_API_KEY"] = '' os.environ["TMDB_BEARER_TOKEN"] = '' news_api_key = os.environ["NEWS_API_KEY"] tmdb_bearer_token = os.environ["TMDB_BEARER_TOKEN"] TOOLS_LIST = ['serpapi', 'wolfram-alpha', 'pal-math', 'pal-colored-objects', 'news-api', 'tmdb-api', 'open-meteo-api'] # 'google-search' TOOLS_DEFAULT_LIST = ['serpapi'] BUG_FOUND_MSG = "Congratulations, you've found a bug in this application!" AUTH_ERR_MSG = "Please paste your OpenAI key from openai.com to use this application. It is not necessary to hit a button or key after pasting it." MAX_TOKENS = 512 TEMPERATURE = 0 LOOPING_TALKING_HEAD = "videos/humancare.mp4" TALKING_HEAD_WIDTH = "192" MAX_TALKING_HEAD_TEXT_LENGTH = 155 # Pertains to Express-inator functionality NUM_WORDS_DEFAULT = 0 MAX_WORDS = 400 FORMALITY_DEFAULT = "N/A" TEMPERATURE_DEFAULT = 0.5 EMOTION_DEFAULT = "N/A" LANG_LEVEL_DEFAULT = "N/A" TRANSLATE_TO_DEFAULT = "N/A" LITERARY_STYLE_DEFAULT = "N/A" PROMPT_TEMPLATE = PromptTemplate( input_variables=["original_words", "num_words", "formality", "emotions", "lang_level", "translate_to", "literary_style"], template="Restate {num_words}{formality}{emotions}{lang_level}{translate_to}{literary_style}the following: \n{original_words}\n", ) POLLY_VOICE_DATA = PollyVoiceData() AZURE_VOICE_DATA = AzureVoiceData() VOICE_GENDER = 'Female' # "Male" # Pertains to WHISPER functionality WHISPER_DETECT_LANG = "Detect language" # UNCOMMENT TO USE WHISPER warnings.filterwarnings("ignore") WHISPER_MODEL = whisper.load_model("tiny") print("WHISPER_MODEL", WHISPER_MODEL) # gradio settings # css CSS = ".gradio-container {background-color: lightgray}" # placeholder for chat text input PLACEHOLDER = "How much is the monthly premium?" # example questions EXAMPLES = ["What is the name of my plan?", "How much is the monthly premium?", "Is prostate cancer screening supported by my plan?", "Have I spent enough on drug expenses for catastrophic coverage to kick in?"] AUTHORS = """
This application, developed by Greg Hayworth, Srikanth Tangelloju, Lincoln Snyder, Michal Piekarczyk, and Xingde Jiang, demonstrates a conversational agent implemented with OpenAI GPT-3.5 and LangChain. For faster inference without waiting in queue, you may duplicate the space.
""" # UNCOMMENT TO USE WHISPER def transcribe(aud_inp, whisper_lang): if aud_inp is None: return "" aud = whisper.load_audio(aud_inp) aud = whisper.pad_or_trim(aud) mel = whisper.log_mel_spectrogram(aud).to(WHISPER_MODEL.device) _, probs = WHISPER_MODEL.detect_language(mel) options = whisper.DecodingOptions(fp16=False) if whisper_lang != WHISPER_DETECT_LANG: whisper_lang_code = POLLY_VOICE_DATA.get_whisper_lang_code( whisper_lang) options = whisper.DecodingOptions( fp16=False, language=whisper_lang_code) result = whisper.decode(WHISPER_MODEL, mel, options) print("result.text", result.text) result_text = "" if result and result.text: result_text = result.text return result_text # Pertains to Express-inator functionality def transform_text(desc, express_chain, num_words, formality, anticipation_level, joy_level, trust_level, fear_level, surprise_level, sadness_level, disgust_level, anger_level, lang_level, translate_to, literary_style): num_words_prompt = "" if num_words and int(num_words) != 0: num_words_prompt = "using up to " + str(num_words) + " words, " # Change some arguments to lower case formality = formality.lower() anticipation_level = anticipation_level.lower() joy_level = joy_level.lower() trust_level = trust_level.lower() fear_level = fear_level.lower() surprise_level = surprise_level.lower() sadness_level = sadness_level.lower() disgust_level = disgust_level.lower() anger_level = anger_level.lower() formality_str = "" if formality != "n/a": formality_str = "in a " + formality + " manner, " # put all emotions into a list emotions = [] if anticipation_level != "n/a": emotions.append(anticipation_level) if joy_level != "n/a": emotions.append(joy_level) if trust_level != "n/a": emotions.append(trust_level) if fear_level != "n/a": emotions.append(fear_level) if surprise_level != "n/a": emotions.append(surprise_level) if sadness_level != "n/a": emotions.append(sadness_level) if disgust_level != "n/a": emotions.append(disgust_level) if anger_level != "n/a": emotions.append(anger_level) emotions_str = "" if len(emotions) > 0: if len(emotions) == 1: emotions_str = "with emotion of " + emotions[0] + ", " else: emotions_str = "with emotions of " + \ ", ".join(emotions[:-1]) + " and " + emotions[-1] + ", " lang_level_str = "" if lang_level != LANG_LEVEL_DEFAULT: lang_level_str = "at a " + lang_level + \ " level, " if translate_to == TRANSLATE_TO_DEFAULT else "" translate_to_str = "" if translate_to != TRANSLATE_TO_DEFAULT: translate_to_str = "translated to " + \ ("" if lang_level == TRANSLATE_TO_DEFAULT else lang_level + " level ") + translate_to + ", " literary_style_str = "" if literary_style != LITERARY_STYLE_DEFAULT: if literary_style == "Prose": literary_style_str = "as prose, " elif literary_style == "Summary": literary_style_str = "as a summary, " elif literary_style == "Outline": literary_style_str = "as an outline numbers and lower case letters, " elif literary_style == "Bullets": literary_style_str = "as bullet points using bullets, " elif literary_style == "Poetry": literary_style_str = "as a poem, " elif literary_style == "Haiku": literary_style_str = "as a haiku, " elif literary_style == "Limerick": literary_style_str = "as a limerick, " elif literary_style == "Joke": literary_style_str = "as a very funny joke with a setup and punchline, " elif literary_style == "Knock-knock": literary_style_str = "as a very funny knock-knock joke, " formatted_prompt = PROMPT_TEMPLATE.format( original_words=desc, num_words=num_words_prompt, formality=formality_str, emotions=emotions_str, lang_level=lang_level_str, translate_to=translate_to_str, literary_style=literary_style_str ) trans_instr = num_words_prompt + formality_str + emotions_str + \ lang_level_str + translate_to_str + literary_style_str if express_chain and len(trans_instr.strip()) > 0: generated_text = express_chain.run( {'original_words': desc, 'num_words': num_words_prompt, 'formality': formality_str, 'emotions': emotions_str, 'lang_level': lang_level_str, 'translate_to': translate_to_str, 'literary_style': literary_style_str}).strip() else: print("Not transforming text") generated_text = desc # replace all newlines withno audio' # Save the audio stream returned by Amazon Polly on Lambda's temp directory if "AudioStream" in response: with closing(response["AudioStream"]) as stream: # output = os.path.join("/tmp/", "speech.mp3") try: with open('audios/tempfile.mp3', 'wb') as f: f.write(stream.read()) temp_aud_file = gr.File("audios/tempfile.mp3") temp_aud_file_url = "/file=" + temp_aud_file.value['name'] html_audio = f'' except IOError as error: # Could not write to file, exit gracefully print(error) return None, None else: # The response didn't contain audio data, exit gracefully print("Could not stream audio") return None, None return html_audio, "audios/tempfile.mp3" def create_html_video(file_name, width): temp_file_url = "/file=" + tmp_vid_file.value['name'] html_video = f'' return html_video def do_html_video_speak(words_to_speak, azure_language): azure_voice = AZURE_VOICE_DATA.get_voice(azure_language, "Male") if not azure_voice: azure_voice = "en-US-ChristopherNeural" headers = {"Authorization": f"Bearer {os.environ['EXHUMAN_API_KEY']}"} body = { 'bot_name': 'humancare', 'bot_response': words_to_speak, 'azure_voice': azure_voice, 'azure_style': 'friendly', 'animation_pipeline': 'high_speed', } api_endpoint = "https://api.exh.ai/animations/v1/generate_lipsync" res = requests.post(api_endpoint, json=body, headers=headers) print("res.status_code: ", res.status_code) html_video = '
no video' if isinstance(res.content, bytes): response_stream = io.BytesIO(res.content) print("len(res.content)): ", len(res.content)) with open('videos/tempfile.mp4', 'wb') as f: f.write(response_stream.read()) temp_file = gr.File("videos/tempfile.mp4") temp_file_url = "/file=" + temp_file.value['name'] html_video = f'' else: print('video url unknown') return html_video, "videos/tempfile.mp4" def update_selected_tools(widget, state, llm): if widget: state = widget chain, express_chain = load_chain(state, llm) return state, llm, chain, express_chain def update_talking_head(widget, state): if widget: state = widget video_html_talking_head = create_html_video( LOOPING_TALKING_HEAD, TALKING_HEAD_WIDTH) return state, video_html_talking_head else: # return state, create_html_video(LOOPING_TALKING_HEAD, "32") return None, "" def update_foo(widget, state): if widget: state = widget return state with gr.Blocks(css=CSS) as block: llm_state = gr.State() history_state = gr.State() chain_state = gr.State() express_chain_state = gr.State() tools_list_state = gr.State(TOOLS_DEFAULT_LIST) trace_chain_state = gr.State(False) speak_text_state = gr.State(False) talking_head_state = gr.State(True) # Takes the input and repeats it back to the user, optionally transforming it. monologue_state = gr.State(False) # Pertains to Express-inator functionality num_words_state = gr.State(NUM_WORDS_DEFAULT) formality_state = gr.State(FORMALITY_DEFAULT) anticipation_level_state = gr.State(EMOTION_DEFAULT) joy_level_state = gr.State(EMOTION_DEFAULT) trust_level_state = gr.State(EMOTION_DEFAULT) fear_level_state = gr.State(EMOTION_DEFAULT) surprise_level_state = gr.State(EMOTION_DEFAULT) sadness_level_state = gr.State(EMOTION_DEFAULT) disgust_level_state = gr.State(EMOTION_DEFAULT) anger_level_state = gr.State(EMOTION_DEFAULT) lang_level_state = gr.State(LANG_LEVEL_DEFAULT) translate_to_state = gr.State(TRANSLATE_TO_DEFAULT) literary_style_state = gr.State(LITERARY_STYLE_DEFAULT) # Pertains to WHISPER functionality whisper_lang_state = gr.State(WHISPER_DETECT_LANG) with gr.Tab("Chat"): with gr.Row(): with gr.Column(): gr.Markdown("""# NLP QA Chat Demo""") with gr.Row(): with gr.Column(scale=1, min_width=TALKING_HEAD_WIDTH, visible=True): speak_text_cb = gr.Checkbox(label="Enable speech", value=True) speak_text_cb.change(update_foo, inputs=[speak_text_cb, speak_text_state], outputs=[speak_text_state]) my_file = gr.File(label="Upload a file", type="file", visible=False) tmp_vid_file = gr.File(LOOPING_TALKING_HEAD, visible=False) # tmp_file_url = "/file=" + tmp_vid_file.value['name'] # htm_video = create_html_video( # LOOPING_TALKING_HEAD, TALKING_HEAD_WIDTH) # video_html = gr.HTML(htm_video) video_html = gr.HTML("") # my_aud_file = gr.File(label="Audio file", type="file", visible=True) tmp_aud_file = gr.File("audios/tempfile.mp3", visible=False) tmp_aud_file_url = "/file=" + tmp_aud_file.value['name'] htm_audio = f'' # htm_audio = f'' # htm_audio = f'' audio_html = gr.HTML(htm_audio) with gr.Column(scale=7): chatbot = gr.Chatbot() with gr.Accordion(label='Show AI chain of thought: ', open=False,): ai_cot = gr.HTML(show_label=False) with gr.Row(): message = gr.Textbox(label="What's on your mind??", placeholder=PLACEHOLDER, lines=1) submit = gr.Button(value="Send", variant="secondary").style( full_width=False) # UNCOMMENT TO USE WHISPER with gr.Row(): audio_comp = gr.Microphone(source="microphone", type="filepath", label="Just say it!", interactive=True, streaming=False) audio_comp.change(transcribe, inputs=[ audio_comp, whisper_lang_state], outputs=[message]) gr.Examples( examples=EXAMPLES, inputs=message ) with gr.Tab("Settings"): tools_cb_group = gr.CheckboxGroup(label="Tools:", choices=TOOLS_LIST, value=TOOLS_DEFAULT_LIST) tools_cb_group.change(update_selected_tools, inputs=[tools_cb_group, tools_list_state, llm_state], outputs=[tools_list_state, llm_state, chain_state, express_chain_state]) trace_chain_cb = gr.Checkbox( label="Show reasoning chain in chat bubble", value=False) trace_chain_cb.change(update_foo, inputs=[trace_chain_cb, trace_chain_state], outputs=[trace_chain_state]) # speak_text_cb = gr.Checkbox(label="Speak text from agent", value=False) # speak_text_cb.change(update_foo, inputs=[speak_text_cb, speak_text_state], # outputs=[speak_text_state]) talking_head_cb = gr.Checkbox(label="Show talking head", value=False) talking_head_cb.change(update_talking_head, inputs=[talking_head_cb, talking_head_state], outputs=[talking_head_state, video_html]) monologue_cb = gr.Checkbox(label="Babel fish mode (translate/restate what you enter, no conversational agent)", value=False) monologue_cb.change(update_foo, inputs=[monologue_cb, monologue_state], outputs=[monologue_state]) with gr.Tab("Whisper STT"): whisper_lang_radio = gr.Radio(label="Whisper speech-to-text language:", choices=[ WHISPER_DETECT_LANG, "Arabic", "Arabic (Gulf)", "Catalan", "Chinese (Cantonese)", "Chinese (Mandarin)", "Danish", "Dutch", "English (Australian)", "English (British)", "English (Indian)", "English (New Zealand)", "English (South African)", "English (US)", "English (Welsh)", "Finnish", "French", "French (Canadian)", "German", "German (Austrian)", "Georgian", "Hindi", "Icelandic", "Indonesian", "Italian", "Japanese", "Korean", "Norwegian", "Polish", "Portuguese (Brazilian)", "Portuguese (European)", "Romanian", "Russian", "Spanish (European)", "Spanish (Mexican)", "Spanish (US)", "Swedish", "Turkish", "Ukrainian", "Welsh"], value=WHISPER_DETECT_LANG) whisper_lang_radio.change(update_foo, inputs=[whisper_lang_radio, whisper_lang_state], outputs=[whisper_lang_state]) with gr.Tab("Translate to"): lang_level_radio = gr.Radio(label="Language level:", choices=[ LANG_LEVEL_DEFAULT, "1st grade", "2nd grade", "3rd grade", "4th grade", "5th grade", "6th grade", "7th grade", "8th grade", "9th grade", "10th grade", "11th grade", "12th grade", "University"], value=LANG_LEVEL_DEFAULT) lang_level_radio.change(update_foo, inputs=[lang_level_radio, lang_level_state], outputs=[lang_level_state]) translate_to_radio = gr.Radio(label="Language:", choices=[ TRANSLATE_TO_DEFAULT, "Arabic", "Arabic (Gulf)", "Catalan", "Chinese (Cantonese)", "Chinese (Mandarin)", "Danish", "Dutch", "English (Australian)", "English (British)", "English (Indian)", "English (New Zealand)", "English (South African)", "English (US)", "English (Welsh)", "Finnish", "French", "French (Canadian)", "German", "German (Austrian)", "Georgian", "Hindi", "Icelandic", "Indonesian", "Italian", "Japanese", "Korean", "Norwegian", "Polish", "Portuguese (Brazilian)", "Portuguese (European)", "Romanian", "Russian", "Spanish (European)", "Spanish (Mexican)", "Spanish (US)", "Swedish", "Turkish", "Ukrainian", "Welsh", "emojis", "Gen Z slang", "how the stereotypical Karen would say it", "Klingon", "Pirate", "Strange Planet expospeak technical talk", "Yoda"], value=TRANSLATE_TO_DEFAULT) translate_to_radio.change(update_foo, inputs=[translate_to_radio, translate_to_state], outputs=[translate_to_state]) # with gr.Tab("Formality"): # formality_radio = gr.Radio(label="Formality:", # choices=[FORMALITY_DEFAULT, # "Casual", "Polite", "Honorific"], # value=FORMALITY_DEFAULT) # formality_radio.change(update_foo, # inputs=[formality_radio, formality_state], # outputs=[formality_state]) # with gr.Tab("Lit style"): # literary_style_radio = gr.Radio(label="Literary style:", choices=[ # LITERARY_STYLE_DEFAULT, "Prose", "Summary", "Outline", "Bullets", "Poetry", "Haiku", "Limerick", "Joke", # "Knock-knock"], # value=LITERARY_STYLE_DEFAULT) # literary_style_radio.change(update_foo, # inputs=[literary_style_radio, # literary_style_state], # outputs=[literary_style_state]) # with gr.Tab("Emotions"): # anticipation_level_radio = gr.Radio(label="Anticipation level:", # choices=[ # EMOTION_DEFAULT, "Interest", "Anticipation", "Vigilance"], # value=EMOTION_DEFAULT) # anticipation_level_radio.change(update_foo, # inputs=[anticipation_level_radio, # anticipation_level_state], # outputs=[anticipation_level_state]) # joy_level_radio = gr.Radio(label="Joy level:", # choices=[EMOTION_DEFAULT, # "Serenity", "Joy", "Ecstasy"], # value=EMOTION_DEFAULT) # joy_level_radio.change(update_foo, # inputs=[joy_level_radio, joy_level_state], # outputs=[joy_level_state]) # trust_level_radio = gr.Radio(label="Trust level:", # choices=[ # EMOTION_DEFAULT, "Acceptance", "Trust", "Admiration"], # value=EMOTION_DEFAULT) # trust_level_radio.change(update_foo, # inputs=[trust_level_radio, trust_level_state], # outputs=[trust_level_state]) # fear_level_radio = gr.Radio(label="Fear level:", # choices=[EMOTION_DEFAULT, # "Apprehension", "Fear", "Terror"], # value=EMOTION_DEFAULT) # fear_level_radio.change(update_foo, # inputs=[fear_level_radio, fear_level_state], # outputs=[fear_level_state]) # surprise_level_radio = gr.Radio(label="Surprise level:", # choices=[ # EMOTION_DEFAULT, "Distraction", "Surprise", "Amazement"], # value=EMOTION_DEFAULT) # surprise_level_radio.change(update_foo, # inputs=[surprise_level_radio, # surprise_level_state], # outputs=[surprise_level_state]) # sadness_level_radio = gr.Radio(label="Sadness level:", # choices=[ # EMOTION_DEFAULT, "Pensiveness", "Sadness", "Grief"], # value=EMOTION_DEFAULT) # sadness_level_radio.change(update_foo, # inputs=[sadness_level_radio, # sadness_level_state], # outputs=[sadness_level_state]) # disgust_level_radio = gr.Radio(label="Disgust level:", # choices=[EMOTION_DEFAULT, # "Boredom", "Disgust", "Loathing"], # value=EMOTION_DEFAULT) # disgust_level_radio.change(update_foo, # inputs=[disgust_level_radio, # disgust_level_state], # outputs=[disgust_level_state]) # anger_level_radio = gr.Radio(label="Anger level:", # choices=[EMOTION_DEFAULT, # "Annoyance", "Anger", "Rage"], # value=EMOTION_DEFAULT) # anger_level_radio.change(update_foo, # inputs=[anger_level_radio, anger_level_state], # outputs=[anger_level_state]) with gr.Tab("Max words"): num_words_slider = gr.Slider(label="Max number of words to generate (0 for don't care)", value=NUM_WORDS_DEFAULT, minimum=0, maximum=MAX_WORDS, step=10) num_words_slider.change(update_foo, inputs=[num_words_slider, num_words_state], outputs=[num_words_state]) gr.HTML(AUTHORS) gr.HTML("""