Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
from datetime import datetime | |
import random | |
import os | |
from huggingface_hub import Repository | |
random.seed(1234) | |
import subprocess | |
# Set Git user information | |
subprocess.run(["git", "config", "--global", "user.email", "[email protected]"]) | |
subprocess.run(["git", "config", "--global", "user.name", "czyang"]) | |
hf_token = os.getenv("HF_TOKEN") | |
print("HF Token is none?", hf_token is None) | |
# Initialize the repository | |
DATASET_REPO_URL = "https://huggingface.co./datasets/czyang/Foley-User-Study-Response-V2" | |
repo = Repository( | |
local_dir="user_responses", | |
clone_from=DATASET_REPO_URL, | |
use_auth_token=hf_token | |
) | |
def prepare_test_cases(): | |
json_path = "videos/videos.json" | |
with open(json_path, "r") as f: | |
video_dict = json.load(f) | |
video_ids = list(video_dict.keys()) | |
for video_id in video_ids: | |
if random.random() > 0.5: | |
video_list = [video_dict[video_id]['ours'], video_dict[video_id]['foleycrafter']] | |
else: | |
video_list = [video_dict[video_id]['foleycrafter'], video_dict[video_id]['ours']] | |
random.shuffle(video_list) | |
video_dict[video_id]['Video 1'] = video_list[0] | |
video_dict[video_id]['Video 2'] = video_list[1] | |
return video_dict | |
video_dict = prepare_test_cases() | |
video_ids = list(video_dict.keys()) | |
random.shuffle(video_ids) | |
questions = [ | |
"Which video's audio best matches the sound of {}?", | |
"In which video is the timing of the audio best synchronized with what you can see in the video?", | |
"Which video has audio that sounds cleaner and more high definition? (Please ignore the type of sound and whether it's timed to the video, focus only on the audio quality.)", | |
"Assuming the video is meant to sound like {}, which video has the best audio overall?" | |
] | |
submissions_file = "user_responses/response.jsonl" | |
def has_already_submitted(user_id): | |
if os.path.exists(submissions_file): | |
with open(submissions_file, "r") as f: | |
for line in f: | |
submission = json.loads(line) | |
if submission.get("u_id") == user_id: | |
return True | |
return False | |
# Save responses | |
def save_responses(unique_submission, *responses): | |
timestamp = datetime.now().isoformat() | |
info = responses[-1] | |
responses = responses[:-1] | |
unique_id = info["session_id"] | |
user_id = f"{unique_id}" | |
# Check for unique submission | |
if unique_submission and has_already_submitted(user_id): | |
return "You have already submitted responses. Thank you for participating!" | |
# Initialize the result dictionary | |
result = { | |
"u_id": user_id, | |
"timestamp": timestamp, | |
"responses": [] | |
} | |
for index in range(len(video_ids)): | |
start_idx = index * len(questions) | |
end_idx = start_idx + len(questions) | |
response = responses[start_idx:end_idx] | |
if any(r is None for r in response): | |
return "Please answer all questions before submitting." | |
video_id = video_ids[index] | |
pair_response = { | |
video_id: { | |
'semantic': video_dict[video_id][response[0]], | |
'sync': video_dict[video_id][response[1]], | |
'quality': video_dict[video_id][response[2]], | |
'overall': video_dict[video_id][response[3]], | |
} | |
} | |
result["responses"].append(pair_response) | |
result["responses"] = sorted(result["responses"], key=lambda x: x.keys()) | |
# Save response locally and push to Hugging Face Hub | |
with open(submissions_file, "a") as f: | |
f.write(json.dumps(result) + "\n") | |
# Push changes to the Hugging Face dataset repo | |
repo.push_to_hub() | |
return "All responses saved! Thank you for participating!" | |
def create_interface(unique_submission=False): | |
with gr.Blocks() as demo: | |
gr.Markdown("# Human Preference Study: Video Comparison") | |
gr.Markdown(""" | |
In this study, you will watch (and listen to) pairs of videos side by side. | |
Please watch and **listen** to each pair of videos carefully and answer the three associated questions. | |
**Headphones are recommended!** | |
""") | |
# Display video pairs and questions | |
responses = [] | |
for index, video_id in enumerate(video_ids): | |
video1 = video_dict[video_id]['Video 1'] | |
video2 = video_dict[video_id]['Video 2'] | |
audio_prompt = video_dict[video_id]['audio prompt'] | |
gr.Markdown(f"### Video Pair {index + 1}") | |
with gr.Row(): | |
gr.Video(video1, label="Video 1") | |
gr.Video(video2, label="Video 2") | |
with gr.Column(): | |
responses.append(gr.Radio(["Video 1", "Video 2"], label=questions[0].format(audio_prompt), value=None)) | |
responses.append(gr.Radio(["Video 1", "Video 2"], label=questions[1], value=None)) | |
responses.append(gr.Radio(["Video 1", "Video 2"], label=questions[2], value=None)) | |
responses.append(gr.Radio(["Video 1", "Video 2"], label=questions[3].format(audio_prompt), value=None)) | |
gr.Markdown("---") | |
info = gr.JSON(visible=False) | |
demo.load(predict, None, info) | |
submit_btn = gr.Button("Submit") | |
result_message = gr.Textbox(label="Message (please only submit once)", interactive=False) | |
submit_btn.click( | |
fn=lambda *args: save_responses(unique_submission, *args), | |
inputs=responses+[info], | |
outputs=result_message | |
) | |
return demo | |
def predict(request: gr.Request): | |
headers = request.headers | |
host = request.client.host | |
user_agent = request.headers["user-agent"] | |
session_id = request.session_hash | |
return { | |
"ip": host, | |
"user_agent": user_agent, | |
"headers": headers, | |
"session_id": session_id | |
} | |
if __name__ == "__main__": | |
# Launch with unique_submission set based on `--unique` flag | |
demo = create_interface(unique_submission=True) | |
demo.launch(share=True) |