import os
import json
import gradio as gr
import requests
import csv
import argparse
import shutil
from vlog4chat import Vlogger4chat
from vlog4debate import Debate
from utils import download_video
#prompt_templates = {"Default ChatGPT": ""}
parser = argparse.ArgumentParser()
parser.add_argument('--video_path', default='./BV11H4y1F7uH.mp4')
parser.add_argument('--alpha', default=10, type=int, help='Determine the maximum segment number for KTS algorithm, the larger the value, the fewer segments.')
parser.add_argument('--beta', default=1, type=int, help='The smallest time gap between successive clips, in seconds.')
parser.add_argument('--data_dir', default='./', type=str, help='Directory for saving videos and logs.')
parser.add_argument('--tmp_dir', default='./', type=str, help='Directory for saving intermediate files.')
# * Models settings *
parser.add_argument('--openai_api_key', default='xxx', type=str, help='OpenAI API key')
parser.add_argument('--image_caption', action='store_true', dest='image_caption', default=True, help='Set this flag to True if you want to use BLIP Image Caption')
parser.add_argument('--dense_caption', action='store_true', dest='dense_caption', default=True, help='Set this flag to True if you want to use Dense Caption')
parser.add_argument('--feature_extractor', default='./clip-vit-base-patch32', help='Select the feature extractor model for video segmentation')
parser.add_argument('--feature_extractor_device', choices=['cuda', 'cpu'], default='cuda', help='Select the device: cuda or cpu')
parser.add_argument('--image_captioner', choices=['blip2-opt', 'blip2-flan-t5', 'blip'], dest='captioner_base_model', default='blip2-opt', help='blip2 requires 15G GPU memory, blip requires 6G GPU memory')
parser.add_argument('--image_captioner_device', choices=['cuda', 'cpu'], default='cuda', help='Select the device: cuda or cpu, gpu memory larger than 14G is recommended')
parser.add_argument('--dense_captioner_device', choices=['cuda', 'cpu'], default='cuda', help='Select the device: cuda or cpu, < 6G GPU is not recommended>')
parser.add_argument('--audio_translator', default='large')
parser.add_argument('--audio_translator_device', choices=['cuda', 'cpu'], default='cuda')
parser.add_argument('--gpt_version', choices=['gpt-3.5-turbo'], default='gpt-3.5-turbo')
args = parser.parse_args()
vlogger = Vlogger4chat(args)
def get_empty_state():
return {"total_tokens": 0, "messages": []}
def submit_message(prompt, state):
history = state['messages']
if not prompt:
return gr.update(value=''), [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)], state
prompt_msg = { "role": "user", "content": prompt }
try:
history.append(prompt_msg)
answer = vlogger.chat2video(prompt)
history.append({"role": "system", "content": answer})
except Exception as e:
history.append(prompt_msg)
history.append({
"role": "system",
"content": f"Error: {e}"
})
chat_messages = [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)]
return '', chat_messages, state
def submit_message_debate(prompt, state):
history = state['messages']
if not prompt:
return gr.update(value=''), [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)], state
prompt_msg = { "role": "user", "content": prompt }
try:
history.append(prompt_msg)
debate_topic = ""
while debate_topic == "":
debate_topic = prompt
config = json.load(open("./config4all.json", "r"))
config['debate_topic'] = debate_topic
debate = Debate(num_players=3, config=config, temperature=0, sleep_time=0)
answer = debate.run()
#chat_messages = [(res["debate_topic"]), (res["base_answer"]), (res["debate_answer"]), (res["Reason"])]
history.append({"role": "system", "content": answer})
except Exception as e:
history.append(prompt_msg)
history.append({
"role": "system",
"content": f"Error: {e}"
})
chat_messages = [(history[i]['content'], history[i+1]['content']) for i in range(0, len(history)-1, 2)]
return '', chat_messages, state
def clear_conversation():
vlogger.clean_history()
return gr.update(value=None, visible=True), gr.update(value=None, visible=True), gr.update(value=None, interactive=True), None, gr.update(value=None, visible=True), get_empty_state()
# download video from any online URL
def subvid_fn(vid):
print(vid)
save_path = download_video(vid)
return gr.update(value=save_path)
# 本地上传,适用于Running on local URL: http://127.0.0.1:6006
def uploaded_video(video_file):
UPLOAD_FOLDER = "./"
if not os.path.exists(UPLOAD_FOLDER):
os.mkdir(UPLOAD_FOLDER)
shutil.copy(video_file, UPLOAD_FOLDER)
gr.Info("File Uploaded!!!")
save_path = os.path.join(UPLOAD_FOLDER, os.path.basename(video_file))
return gr.update(value=save_path)
def vlog_fn(vid_path):
print(vid_path)
if vid_path is None:
log_text = "====== Please choose existing video from the library or provide video URL 🤔====="
else:
log_list = vlogger.video2log(vid_path)
log_text = "\n".join(log_list)
return gr.update(value=log_text, visible=True)
# 初始化一个空的答案记录字典
answers = {}
# 定义处理用户选择的函数
def submit_answers_pretest(question1, question2, question3, question4, question5, question6, question7, question8, question9, question10):
answers['Question 1'] = question1
answers['Question 2'] = question2
answers['Question 3'] = question3
answers['Question 4'] = question4
answers['Question 5'] = question5
answers['Question 6'] = question6
answers['Question 7'] = question7
answers['Question 8'] = question8
answers['Question 9'] = question9
answers['Question 10'] = question10
# 可以将结果保存到文件
with open('answers4pretest.txt', 'a') as f:
f.write(f"Question 1: {question1}\n")
f.write(f"Question 2: {question2}\n")
f.write(f"Question 3: {question3}\n")
f.write(f"Question 4: {question4}\n")
f.write(f"Question 5: {question5}\n")
f.write(f"Question 6: {question6}\n")
f.write(f"Question 7: {question7}\n")
f.write(f"Question 8: {question8}\n")
f.write(f"Question 9: {question9}\n")
f.write(f"Question 10: {question10}\n\n")
# 返回一个确认消息
return "谢谢你提交答案!"
def submit_answers_posttest(question1, question2, question3, question4, question5, question6, question7, question8, question9, question10):
answers['Question 1'] = question1
answers['Question 2'] = question2
answers['Question 3'] = question3
answers['Question 4'] = question4
answers['Question 5'] = question5
answers['Question 6'] = question6
answers['Question 7'] = question7
answers['Question 8'] = question8
answers['Question 9'] = question9
answers['Question 10'] = question10
# 可以将结果保存到文件
with open('answers4posttest.txt', 'a') as f:
f.write(f"Question 1: {question1}\n")
f.write(f"Question 2: {question2}\n")
f.write(f"Question 3: {question3}\n")
f.write(f"Question 4: {question4}\n")
f.write(f"Question 5: {question5}\n")
f.write(f"Question 6: {question6}\n")
f.write(f"Question 7: {question7}\n")
f.write(f"Question 8: {question8}\n")
f.write(f"Question 9: {question9}\n")
f.write(f"Question 10: {question10}\n\n")
# 返回一个确认消息
return "谢谢你提交答案!"
css = """
#col-container {max-width: 80%; margin-left: auto; margin-right: auto;}
#video_inp {min-height: 100px}
#chatbox {min-height: 100px;}
#header {text-align: center;}
#hint {font-size: 2.0em; padding: 0.5em; margin: 0;}
.message { font-size: 1.2em; }
"""
with gr.Blocks(css=css) as demo:
with gr.Tabs():
# 第一个标签页
with gr.TabItem("第一步(预先测试)"):
gr.Markdown("## Survey: Please answer the following questions")
# 问题1
question1 = gr.Radio(
choices=["1", "2", "3", "4", "5"],
label="1. What is your favorite color?",
)
# 问题2
question2 = gr.Radio(
choices=["1", "2", "3", "4", "5"],
label="2. What is your preferred mode of transport?",
)
# 问题3
question3 = gr.Radio(
choices=["1", "2", "3", "4", "5"],
label="3. Which type of cuisine do you prefer?",
)
# 问题4
question4 = gr.Radio(
choices=["1", "2", "3", "4", "5"],
label="4. What is your favorite color?",
)
# 问题5
question5 = gr.Radio(
choices=["1", "2", "3", "4", "5"],
label="5. What is your preferred mode of transport?",
)
# 问题6
question6 = gr.Radio(
choices=["1", "2", "3", "4", "5"],
label="6. Which type of cuisine do you prefer?",
)
# 问题7
question7 = gr.Radio(
choices=["1", "2", "3", "4", "5"],
label="7. Which type of cuisine do you prefer?",
)
# 问题8
question8 = gr.Radio(
choices=["1", "2", "3", "4", "5"],
label="8. What is your favorite color?",
)
# 问题9
question9 = gr.Radio(
choices=["1", "2", "3", "4", "5"],
label="9. What is your preferred mode of transport?",
)
# 问题10
question10 = gr.Radio(
choices=["1", "2", "3", "4", "5"],
label="10. Which type of cuisine do you prefer?",
)
# 提交按钮
submit_button = gr.Button("Submit Answers")
# 显示结果
output = gr.Textbox(label="Message")
# 点击提交按钮时,调用submit_answers函数
submit_button.click(
submit_answers_pretest,
inputs=[question1, question2, question3, question4, question5, question6, question7, question8, question9, question10],
outputs=output
)
# 第二个标签页
with gr.TabItem("第二步(VLog使用)"):
state = gr.State(get_empty_state())
with gr.Column(elem_id="col-container"):
gr.Markdown("""## 🎞️ 视频Chat:
Powered by CLIP, BLIP2, GRIT, RAM++, PaddleOCR, Whisper, Custom LLMs and LangChain""",
elem_id="header")
with gr.Row():
with gr.Column():
video_inp = gr.Video(label="video_input")
gr.Markdown("Step 1: 请在下方选取视频", elem_id="hint")
examples = gr.Examples(
examples=[
["./BV11H4y1F7uH.mp4"],
],
inputs=[video_inp],
)
#with gr.Row():
# video_id = gr.Textbox(value="", placeholder="Download video url", show_label=False)
# vidsub_btn = gr.Button("上传网站视频")
chatbot = gr.Chatbot(elem_id="chatbox")
input_message = gr.Textbox(show_label=False, placeholder="输入提问内容并按回车", visible=True)
btn_submit = gr.Button("提问视频内容")
gr.Markdown("如果对上面的回答不满意,请在下方输入需要辩论的问题, *e.g.* *方差越小越好?*", elem_id="hint")
#chatbot_debate = gr.Chatbot(elem_id="chatbox")
input_message_debate = gr.Textbox(show_label=False, placeholder="输入辩论主题并按回车", visible=True)
btn_submit_debate = gr.Button("发起问题辩论")
btn_clear_conversation = gr.Button("🔃 开始新的对话")
with gr.Column():
vlog_btn = gr.Button("Step 2: 查看视频日志(请耐心等待70秒左右)")
vlog_outp = gr.Textbox(label="Document output", lines=70)
total_tokens_str = gr.Markdown(elem_id="total_tokens_str")
gr.HTML('''