import streamlit as st import json import faiss import numpy as np from sentence_transformers import SentenceTransformer import base64 from PIL import Image import io import cv2 from import FaceAnalysis from moviepy.editor import VideoFileClip from sklearn.cluster import DBSCAN from collections import defaultdict import plotly.graph_objs as go from sklearn.decomposition import PCA # Load models @st.cache_resource def load_models(): text_model = SentenceTransformer("all-MiniLM-L6-v2") image_model = SentenceTransformer("clip-ViT-B-32") face_app = FaceAnalysis(providers=['CPUExecutionProvider']) face_app.prepare(ctx_id=0, det_size=(640, 640)) return text_model, image_model, face_app text_model, image_model, face_app = load_models() # Load data @st.cache_data def load_data(video_id): with open(f"{video_id}_summary.json", "r") as f: summary = json.load(f) with open(f"{video_id}_transcription.json", "r") as f: transcription = json.load(f) with open(f"{video_id}_text_metadata.json", "r") as f: text_metadata = json.load(f) with open(f"{video_id}_image_metadata.json", "r") as f: image_metadata = json.load(f) with open(f"{video_id}_face_metadata.json", "r") as f: face_metadata = json.load(f) return summary, transcription, text_metadata, image_metadata, face_metadata video_id = "IMFUOexuEXw" video_path = "avengers_interview.mp4" summary, transcription, text_metadata, image_metadata, face_metadata = load_data(video_id) # Load FAISS indexes @st.cache_resource def load_indexes(video_id): text_index = faiss.read_index(f"{video_id}_text_index.faiss") image_index = faiss.read_index(f"{video_id}_image_index.faiss") face_index = faiss.read_index(f"{video_id}_face_index.faiss") return text_index, image_index, face_index text_index, image_index, face_index = load_indexes(video_id) # Face clustering function def cluster_faces(face_embeddings, eps=0.5, min_samples=3): clustering = DBSCAN(eps=eps, min_samples=min_samples, metric='cosine').fit(face_embeddings) return clustering.labels_ # Face clustering visualization def plot_face_clusters(face_embeddings, labels, face_metadata): pca = PCA(n_components=3) embeddings_3d = pca.fit_transform(face_embeddings) unique_labels = set(labels) colors = [f'rgb({int(r*255)},{int(g*255)},{int(b*255)})' for r, g, b, _ in, 1, len(unique_labels)))] traces = [] for label, color in zip(unique_labels, colors): cluster_points = embeddings_3d[labels == label] hover_text = [] for i, point in enumerate(cluster_points): face = face_metadata[np.where(labels == label)[0][i]] hover_text.append(f"Cluster {label}
Time: {face['start']:.2f}s") trace = go.Scatter3d( x=cluster_points[:, 0], y=cluster_points[:, 1], z=cluster_points[:, 2], mode='markers', name=f'Cluster {label}', marker=dict( size=5, color=color, opacity=0.8 ), text=hover_text, hoverinfo='text' ) traces.append(trace) layout = go.Layout( title='Face Clusters Visualization', scene=dict( xaxis_title='PCA Component 1', yaxis_title='PCA Component 2', zaxis_title='PCA Component 3' ), margin=dict(r=0, b=0, l=0, t=40) ) fig = go.Figure(data=traces, layout=layout) return fig # Search functions def combined_search(query, text_index, image_index, text_metadata, image_metadata, text_model, image_model, n_results=5): if isinstance(query, str): text_vector = text_model.encode([query], convert_to_tensor=True).cpu().numpy() image_vector = image_model.encode([query], convert_to_tensor=True).cpu().numpy() else: # Assume it's an image image_vector = image_model.encode(query, convert_to_tensor=True).cpu().numpy() text_vector = image_vector # Use the same vector for text search in this case text_D, text_I =, n_results) image_D, image_I =, n_results) text_results = [{'data': text_metadata[i], 'distance': d, 'type': 'text'} for i, d in zip(text_I[0], text_D[0])] image_results = [{'data': image_metadata[i], 'distance': d, 'type': 'image'} for i, d in zip(image_I[0], image_D[0])] combined_results = sorted(text_results + image_results, key=lambda x: x['distance']) return combined_results[:n_results] def face_search(face_embedding, index, metadata, n_results=5): D, I =, -1), n_results) results = [metadata[i] for i in I[0]] return results, D[0] def detect_and_embed_face(image, face_app): img_array = np.array(image) faces = face_app.get(img_array) if len(faces) == 0: return None largest_face = max(faces, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1])) return largest_face.embedding def create_video_clip(video_path, start_time, end_time, output_path): with VideoFileClip(video_path) as video: new_clip = video.subclip(start_time, end_time) new_clip.write_videofile(output_path, codec="libx264", audio_codec="aac") return output_path # Streamlit UI st.title("Video Analysis Dashboard") # Sidebar with full video and scrollable transcript st.sidebar.header("Full Video") st.sidebar.header("Video Transcript") transcript_text = transcription['transcription'] st.sidebar.text_area("Full Transcript", transcript_text, height=300) # Main content st.header("Video Summary") col1, col2 = st.columns(2) with col1: st.subheader("Prominent Faces") for face in summary['prominent_faces']: st.write(f"Face ID: {face['id']}, Appearances: {face['appearances']}") if 'thumbnail' in face: image =['thumbnail']))) st.image(image, caption=f"Face ID: {face['id']}", width=100) with col2: st.subheader("Themes") for theme in summary['themes']: st.write(f"Theme ID: {theme['id']}, Keywords: {', '.join(theme['keywords'])}") # Face Clustering st.header("Face Clustering") face_embeddings = face_index.reconstruct_n(0, face_index.ntotal) face_labels = cluster_faces(face_embeddings) # Update face clusters in summary face_clusters = defaultdict(list) for i, label in enumerate(face_labels): face_clusters[label].append(face_metadata[i]) summary['face_clusters'] = [ { 'cluster_id': f'cluster_{label}', 'faces': cluster } for label, cluster in face_clusters.items() ] # Visualize face clusters st.subheader("Face Cluster Visualization") fig = plot_face_clusters(face_embeddings, face_labels, face_metadata) st.plotly_chart(fig) # Search functionality st.header("Search") search_type = st.selectbox("Select search type", ["Combined", "Face"]) if search_type == "Combined": search_method ="Choose search method", ["Text", "Image"]) if search_method == "Text": query = st.text_input("Enter your search query") if st.button("Search"): results = combined_search(query, text_index, image_index, text_metadata, image_metadata, text_model, image_model) st.subheader("Search Results") for result in results: st.write(f"Type: {result['type']}, Time: {result['data']['start']:.2f}s - {result['data']['end']:.2f}s, Distance: {result['distance']:.4f}") if 'text' in result['data']: st.write(f"Text: {result['data']['text']}") clip_path = create_video_clip(video_path, result['data']['start'], result['data']['end'], f"temp_clip_{result['data']['start']}.mp4") st.write("---") else: uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: image = st.image(image, caption="Uploaded Image", use_column_width=True) if st.button("Search"): results = combined_search(image, text_index, image_index, text_metadata, image_metadata, text_model, image_model) st.subheader("Image Search Results") for result in results: st.write(f"Type: {result['type']}, Time: {result['data']['start']:.2f}s - {result['data']['end']:.2f}s, Distance: {result['distance']:.4f}") clip_path = create_video_clip(video_path, result['data']['start'], result['data']['end'], f"temp_clip_{result['data']['start']}.mp4") st.write("---") elif search_type == "Face": face_search_type ="Choose face search method", ["Select from clusters", "Upload image"]) if face_search_type == "Select from clusters": cluster_id = st.selectbox("Select a face cluster", [f'cluster_{label}' for label in set(face_labels) if label != -1]) if st.button("Search"): selected_cluster = next(cluster for cluster in summary['face_clusters'] if cluster['cluster_id'] == cluster_id) st.subheader("Face Cluster Search Results") for face in selected_cluster['faces']: st.write(f"Time: {face['start']:.2f}s - {face['end']:.2f}s") clip_path = create_video_clip(video_path, face['start'], face['end'], f"temp_face_clip_{face['start']}.mp4") st.write("---") else: uploaded_file = st.file_uploader("Choose a face image...", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: image = st.image(image, caption="Uploaded Image", use_column_width=True) if st.button("Search"): face_embedding = detect_and_embed_face(image, face_app) if face_embedding is not None: face_results, face_distances = face_search(face_embedding, face_index, face_metadata) st.subheader("Face Search Results") for result, distance in zip(face_results, face_distances): st.write(f"Time: {result['start']:.2f}s - {result['end']:.2f}s, Distance: {distance:.4f}") clip_path = create_video_clip(video_path, result['start'], result['end'], f"temp_face_clip_{result['start']}.mp4") st.write("---") else: st.error("No face detected in the uploaded image. Please try another image.")