Spaces:
Sleeping
Sleeping
import os | |
import json | |
import gradio as gr | |
import requests | |
import csv | |
import argparse | |
import shutil | |
from vlog4chat import Vlogger4chat | |
from vlog4debate import Debate | |
from utils import download_video | |
#prompt_templates = {"Default ChatGPT": ""} | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--video_path', default='./BV11H4y1F7uH.mp4') | |
parser.add_argument('--alpha', default=10, type=int, help='Determine the maximum segment number for KTS algorithm, the larger the value, the fewer segments.') | |
parser.add_argument('--beta', default=1, type=int, help='The smallest time gap between successive clips, in seconds.') | |
parser.add_argument('--data_dir', default='./', type=str, help='Directory for saving videos and logs.') | |
parser.add_argument('--tmp_dir', default='./', type=str, help='Directory for saving intermediate files.') | |
# * Models settings * | |
parser.add_argument('--openai_api_key', default='xxx', type=str, help='OpenAI API key') | |
parser.add_argument('--image_caption', action='store_true', dest='image_caption', default=True, help='Set this flag to True if you want to use BLIP Image Caption') | |
parser.add_argument('--dense_caption', action='store_true', dest='dense_caption', default=True, help='Set this flag to True if you want to use Dense Caption') | |
parser.add_argument('--feature_extractor', default='./clip-vit-base-patch32', help='Select the feature extractor model for video segmentation') | |
parser.add_argument('--feature_extractor_device', choices=['cuda', 'cpu'], default='cuda', help='Select the device: cuda or cpu') | |
parser.add_argument('--image_captioner', choices=['blip2-opt', 'blip2-flan-t5', 'blip'], dest='captioner_base_model', default='blip2-opt', help='blip2 requires 15G GPU memory, blip requires 6G GPU memory') | |
parser.add_argument('--image_captioner_device', choices=['cuda', 'cpu'], default='cuda', help='Select the device: cuda or cpu, gpu memory larger than 14G is recommended') | |
parser.add_argument('--dense_captioner_device', choices=['cuda', 'cpu'], default='cuda', help='Select the device: cuda or cpu, < 6G GPU is not recommended>') | |
parser.add_argument('--audio_translator', default='large') | |
parser.add_argument('--audio_translator_device', choices=['cuda', 'cpu'], default='cuda') | |
parser.add_argument('--gpt_version', choices=['gpt-3.5-turbo'], default='gpt-3.5-turbo') | |
args = parser.parse_args() | |
vlogger = Vlogger4chat(args) | |
def get_empty_state(): | |
return {"total_tokens": 0, "messages": []} | |
def submit_message(prompt, state): | |
history = state['messages'] | |
if not prompt: | |
return gr.update(value=''), [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)], state | |
prompt_msg = { "role": "user", "content": prompt } | |
try: | |
history.append(prompt_msg) | |
answer = vlogger.chat2video(prompt) | |
history.append({"role": "system", "content": answer}) | |
except Exception as e: | |
history.append(prompt_msg) | |
history.append({ | |
"role": "system", | |
"content": f"Error: {e}" | |
}) | |
chat_messages = [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)] | |
return '', chat_messages, state | |
def submit_message_debate(prompt, state): | |
history = state['messages'] | |
if not prompt: | |
return gr.update(value=''), [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)], state | |
prompt_msg = { "role": "user", "content": prompt } | |
try: | |
history.append(prompt_msg) | |
debate_topic = "" | |
while debate_topic == "": | |
debate_topic = prompt | |
config = json.load(open("./config4all.json", "r")) | |
config['debate_topic'] = debate_topic | |
debate = Debate(num_players=3, config=config, temperature=0, sleep_time=0) | |
answer = debate.run() | |
#chat_messages = [(res["debate_topic"]), (res["base_answer"]), (res["debate_answer"]), (res["Reason"])] | |
history.append({"role": "system", "content": answer}) | |
except Exception as e: | |
history.append(prompt_msg) | |
history.append({ | |
"role": "system", | |
"content": f"Error: {e}" | |
}) | |
chat_messages = [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)] | |
return '', chat_messages, state | |
def clear_conversation(): | |
vlogger.clean_history() | |
return gr.update(value=None, visible=True), gr.update(value=None, visible=True), gr.update(value=None, interactive=True), None, gr.update(value=None, visible=True), get_empty_state() | |
# download video from any online URL | |
def subvid_fn(vid): | |
print(vid) | |
save_path = download_video(vid) | |
return gr.update(value=save_path) | |
# 本地上传,适用于Running on local URL: http://127.0.0.1:6006 | |
def uploaded_video(video_file): | |
UPLOAD_FOLDER = "./" | |
if not os.path.exists(UPLOAD_FOLDER): | |
os.mkdir(UPLOAD_FOLDER) | |
shutil.copy(video_file, UPLOAD_FOLDER) | |
gr.Info("File Uploaded!!!") | |
save_path = os.path.join(UPLOAD_FOLDER, os.path.basename(video_file)) | |
return gr.update(value=save_path) | |
def vlog_fn(vid_path): | |
print(vid_path) | |
if vid_path is None: | |
log_text = "====== Please choose existing video from the library or provide video URL 🤔=====" | |
else: | |
log_list = vlogger.video2log(vid_path) | |
log_text = "\n".join(log_list) | |
return gr.update(value=log_text, visible=True) | |
# 初始化一个空的答案记录字典 | |
answers = {} | |
# 定义处理用户选择的函数 | |
def submit_answers_pretest(question1, question2, question3, question4, question5, question6, question7, question8, question9, question10): | |
answers['Question 1'] = question1 | |
answers['Question 2'] = question2 | |
answers['Question 3'] = question3 | |
answers['Question 4'] = question4 | |
answers['Question 5'] = question5 | |
answers['Question 6'] = question6 | |
answers['Question 7'] = question7 | |
answers['Question 8'] = question8 | |
answers['Question 9'] = question9 | |
answers['Question 10'] = question10 | |
# 可以将结果保存到文件 | |
with open('answers4pretest.txt', 'a') as f: | |
f.write(f"Question 1: {question1}\n") | |
f.write(f"Question 2: {question2}\n") | |
f.write(f"Question 3: {question3}\n") | |
f.write(f"Question 4: {question4}\n") | |
f.write(f"Question 5: {question5}\n") | |
f.write(f"Question 6: {question6}\n") | |
f.write(f"Question 7: {question7}\n") | |
f.write(f"Question 8: {question8}\n") | |
f.write(f"Question 9: {question9}\n") | |
f.write(f"Question 10: {question10}\n\n") | |
# 返回一个确认消息 | |
return "谢谢你提交答案!" | |
def submit_answers_posttest(question1, question2, question3, question4, question5, question6, question7, question8, question9, question10): | |
answers['Question 1'] = question1 | |
answers['Question 2'] = question2 | |
answers['Question 3'] = question3 | |
answers['Question 4'] = question4 | |
answers['Question 5'] = question5 | |
answers['Question 6'] = question6 | |
answers['Question 7'] = question7 | |
answers['Question 8'] = question8 | |
answers['Question 9'] = question9 | |
answers['Question 10'] = question10 | |
# 可以将结果保存到文件 | |
with open('answers4posttest.txt', 'a') as f: | |
f.write(f"Question 1: {question1}\n") | |
f.write(f"Question 2: {question2}\n") | |
f.write(f"Question 3: {question3}\n") | |
f.write(f"Question 4: {question4}\n") | |
f.write(f"Question 5: {question5}\n") | |
f.write(f"Question 6: {question6}\n") | |
f.write(f"Question 7: {question7}\n") | |
f.write(f"Question 8: {question8}\n") | |
f.write(f"Question 9: {question9}\n") | |
f.write(f"Question 10: {question10}\n\n") | |
# 返回一个确认消息 | |
return "谢谢你提交答案!" | |
css = """ | |
#col-container {max-width: 80%; margin-left: auto; margin-right: auto;} | |
#video_inp {min-height: 100px} | |
#chatbox {min-height: 100px;} | |
#header {text-align: center;} | |
#hint {font-size: 1.0em; padding: 0.5em; margin: 0;} | |
.message { font-size: 1.2em; } | |
""" | |
with gr.Blocks(css=css) as demo: | |
with gr.Tabs(): | |
# 第一个标签页 | |
with gr.TabItem("第一步(预先测试)"): | |
gr.Markdown("## Survey: Please answer the following questions") | |
# 问题1 | |
question1 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="1. What is your favorite color?", | |
) | |
# 问题2 | |
question2 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="2. What is your preferred mode of transport?", | |
) | |
# 问题3 | |
question3 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="3. Which type of cuisine do you prefer?", | |
) | |
# 问题4 | |
question4 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="4. What is your favorite color?", | |
) | |
# 问题5 | |
question5 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="5. What is your preferred mode of transport?", | |
) | |
# 问题6 | |
question6 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="6. Which type of cuisine do you prefer?", | |
) | |
# 问题7 | |
question7 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="7. Which type of cuisine do you prefer?", | |
) | |
# 问题8 | |
question8 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="8. What is your favorite color?", | |
) | |
# 问题9 | |
question9 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="9. What is your preferred mode of transport?", | |
) | |
# 问题10 | |
question10 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="10. Which type of cuisine do you prefer?", | |
) | |
# 提交按钮 | |
submit_button = gr.Button("Submit Answers") | |
# 显示结果 | |
output = gr.Textbox(label="Message") | |
# 点击提交按钮时,调用submit_answers函数 | |
submit_button.click( | |
submit_answers_pretest, | |
inputs=[question1, question2, question3, question4, question5, question6, question7, question8, question9, question10], | |
outputs=output | |
) | |
# 第二个标签页 | |
with gr.TabItem("第二步(VLog使用)"): | |
state = gr.State(get_empty_state()) | |
with gr.Column(elem_id="col-container"): | |
gr.Markdown("""## 🎞️ 视频Chat: | |
Powered by CLIP, BLIP2, GRIT, RAM++, PaddleOCR, Whisper, Custom LLMs and LangChain""", | |
elem_id="header") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("Step 1: 请在页面最左下方选取视频", elem_id="hint") | |
video_inp = gr.Video(label="video_input") | |
#with gr.Row(): | |
# video_id = gr.Textbox(value="", placeholder="Download video url", show_label=False) | |
# vidsub_btn = gr.Button("上传网站视频") | |
chatbot = gr.Chatbot(elem_id="chatbox") | |
input_message = gr.Textbox(show_label=False, placeholder="输入提问内容并按回车", visible=True) | |
btn_submit = gr.Button("提问视频内容") | |
gr.Markdown("如果对上面的回答不满意,请在下方输入需要辩论的问题, *e.g.* *方差越小越好?*", elem_id="hint") | |
#chatbot_debate = gr.Chatbot(elem_id="chatbox") | |
input_message_debate = gr.Textbox(show_label=False, placeholder="输入辩论主题并按回车", visible=True) | |
btn_submit_debate = gr.Button("发起问题辩论") | |
btn_clear_conversation = gr.Button("🔃 开始新的对话") | |
with gr.Column(): | |
vlog_btn = gr.Button("Step 2: 查看视频日志(请耐心等待70秒左右)") | |
vlog_outp = gr.Textbox(label="Document output", lines=50) | |
total_tokens_str = gr.Markdown(elem_id="total_tokens_str") | |
gr.Markdown("请点击下方视频(任意选择一个视频进行播放)", elem_id="hint") | |
examples = gr.Examples( | |
examples=[ | |
["./BV11H4y1F7uH.mp4"], | |
], | |
inputs=[video_inp], | |
) | |
gr.HTML('''<br><br><br><center>You can duplicate this Space to skip the queue:<a href="https://huggingface.co./spaces/anzorq/chatgpt-demo?duplicate=true"><img src="https://bit.ly/3gLdBN6" alt="Duplicate Space"></a><br></center>''') | |
btn_submit.click(submit_message, [input_message, state], [input_message, chatbot]) | |
input_message.submit(submit_message, [input_message, state], [input_message, chatbot]) | |
btn_submit_debate.click(submit_message_debate, [input_message_debate, state], [input_message_debate, chatbot]) | |
input_message_debate.submit(submit_message_debate, [input_message_debate, state], [input_message_debate, chatbot]) | |
btn_clear_conversation.click(clear_conversation, [], [input_message, input_message_debate, video_inp, chatbot, vlog_outp, state]) | |
vlog_btn.click(vlog_fn, [video_inp], [vlog_outp]) | |
#vidsub_btn.click(subvid_fn, [video_id], [video_inp]) | |
# 第三个标签页 | |
with gr.TabItem("第三步(再次测试)"): | |
gr.Markdown("## Survey: Please answer the following questions") | |
# 问题1 | |
question1 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="1. What is your favorite color?", | |
) | |
# 问题2 | |
question2 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="2. What is your preferred mode of transport?", | |
) | |
# 问题3 | |
question3 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="3. Which type of cuisine do you prefer?", | |
) | |
# 问题4 | |
question4 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="4. What is your favorite color?", | |
) | |
# 问题5 | |
question5 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="5. What is your preferred mode of transport?", | |
) | |
# 问题6 | |
question6 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="6. Which type of cuisine do you prefer?", | |
) | |
# 问题7 | |
question7 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="7. Which type of cuisine do you prefer?", | |
) | |
# 问题8 | |
question8 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="8. What is your favorite color?", | |
) | |
# 问题9 | |
question9 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="9. What is your preferred mode of transport?", | |
) | |
# 问题10 | |
question10 = gr.Radio( | |
choices=["1", "2", "3", "4", "5"], | |
label="10. Which type of cuisine do you prefer?", | |
) | |
# 提交按钮 | |
submit_button = gr.Button("Submit Answers") | |
# 显示结果 | |
output = gr.Textbox(label="Message") | |
# 点击提交按钮时,调用submit_answers函数 | |
submit_button.click( | |
submit_answers_posttest, | |
inputs=[question1, question2, question3, question4, question5, question6, question7, question8, question9, question10], | |
outputs=output | |
) | |
demo.load(queue=False) | |
demo.queue() | |
if __name__ == "__main__": | |
demo.launch(share=True) |