import streamlit as st import json import faiss import numpy as np from sentence_transformers import SentenceTransformer import base64 from PIL import Image import io import cv2 from insightface.app import FaceAnalysis # Load models @st.cache_resource def load_models(): text_model = SentenceTransformer("all-MiniLM-L6-v2") image_model = SentenceTransformer("clip-ViT-B-32") face_app = FaceAnalysis(providers=['CPUExecutionProvider']) face_app.prepare(ctx_id=0, det_size=(640, 640)) return text_model, image_model, face_app text_model, image_model, face_app = load_models() # Load data @st.cache_data def load_data(video_id): with open(f"{video_id}_summary.json", "r") as f: summary = json.load(f) with open(f"{video_id}_transcription.json", "r") as f: transcription = json.load(f) with open(f"{video_id}_text_metadata.json", "r") as f: text_metadata = json.load(f) with open(f"{video_id}_image_metadata.json", "r") as f: image_metadata = json.load(f) with open(f"{video_id}_object_infos.json", "r") as f: object_infos = json.load(f) with open(f"{video_id}_face_metadata.json", "r") as f: face_metadata = json.load(f) return summary, transcription, text_metadata, image_metadata, object_infos, face_metadata video_id = "IMFUOexuEXw" summary, transcription, text_metadata, image_metadata, object_infos, face_metadata = load_data(video_id) # Load FAISS indexes @st.cache_resource def load_indexes(video_id): text_index = faiss.read_index(f"{video_id}_text_index.faiss") image_index = faiss.read_index(f"{video_id}_image_index.faiss") face_index = faiss.read_index(f"{video_id}_face_index.faiss") return text_index, image_index, face_index text_index, image_index, face_index = load_indexes(video_id) # Search functions def text_search(query, index, metadata, model, n_results=5): query_vector = model.encode([query], convert_to_tensor=True).cpu().numpy() D, I = index.search(query_vector, n_results) results = [metadata[i] for i in I[0]] return results, D[0] def image_search(image, index, metadata, model, n_results=5): image_vector = model.encode(image, convert_to_tensor=True).cpu().numpy() D, I = index.search(image_vector.reshape(1, -1), n_results) results = [metadata[i] for i in I[0]] return results, D[0] def face_search(face_embedding, index, metadata, n_results=5): D, I = index.search(np.array(face_embedding).reshape(1, -1), n_results) results = [metadata[i] for i in I[0]] return results, D[0] def detect_and_embed_face(image, face_app): img_array = np.array(image) faces = face_app.get(img_array) if len(faces) == 0: return None largest_face = max(faces, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1])) return largest_face.embedding # Streamlit UI st.title("Video Analysis Dashboard") # Display video summary st.header("Video Summary") st.subheader("Prominent Faces") for face in summary['prominent_faces']: st.write(f"Face ID: {face['id']}, Appearances: {face['appearances']}, First Appearance: {face['first_appearance']:.2f}s") if 'thumbnail' in face: image = Image.open(io.BytesIO(base64.b64decode(face['thumbnail']))) st.image(image, caption=f"Face ID: {face['id']}", width=100) st.subheader("Prominent Objects") for obj in summary['prominent_objects']: st.write(f"Object ID: {obj['id']}, Appearances: {obj['appearances']}, Representative Frame: {obj['representative_frame']:.2f}s") st.subheader("Themes") for theme in summary['themes']: st.write(f"Theme ID: {theme['id']}, Keywords: {', '.join(theme['keywords'])}") # Search functionality st.header("Search") search_type = st.selectbox("Select search type", ["Text", "Face", "Image"]) if search_type == "Text": query = st.text_input("Enter your search query") search_target = st.multiselect("Search in", ["Transcript", "Frames"], default=["Transcript"]) if st.button("Search"): if "Transcript" in search_target: text_results, text_distances = text_search(query, text_index, text_metadata, text_model) st.subheader("Transcript Search Results") for result, distance in zip(text_results, text_distances): st.write(f"Time: {result['start']:.2f}s - {result['end']:.2f}s, Distance: {distance:.4f}") st.write(f"Text: {result['text']}") st.write("---") if "Frames" in search_target: frame_results, frame_distances = text_search(query, image_index, image_metadata, image_model) st.subheader("Frame Search Results") for result, distance in zip(frame_results, frame_distances): st.write(f"Time: {result['start']:.2f}s - {result['end']:.2f}s, Distance: {distance:.4f}") st.write("---") elif search_type == "Face": face_search_type = st.radio("Choose face search method", ["Select from video", "Upload image"]) if face_search_type == "Select from video": face_id = st.selectbox("Select a face", [face['id'] for face in summary['prominent_faces']]) if st.button("Search"): selected_face = next(face for face in summary['prominent_faces'] if face['id'] == face_id) face_results, face_distances = face_search(selected_face['embedding'], face_index, face_metadata) st.subheader("Face Search Results") for result, distance in zip(face_results, face_distances): st.write(f"Time: {result['start']:.2f}s - {result['end']:.2f}s, Distance: {distance:.4f}") st.write(f"Face ID: {result['face_id']}") st.write("---") else: uploaded_file = st.file_uploader("Choose a face image...", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: image = Image.open(uploaded_file) st.image(image, caption="Uploaded Image", use_column_width=True) if st.button("Search"): face_embedding = detect_and_embed_face(image, face_app) if face_embedding is not None: face_results, face_distances = face_search(face_embedding, face_index, face_metadata) st.subheader("Face Search Results") for result, distance in zip(face_results, face_distances): st.write(f"Time: {result['start']:.2f}s - {result['end']:.2f}s, Distance: {distance:.4f}") st.write(f"Face ID: {result['face_id']}") st.write("---") else: st.error("No face detected in the uploaded image. Please try another image.") elif search_type == "Image": image_search_type = st.radio("Choose image search method", ["Upload image", "Text description"]) if image_search_type == "Upload image": uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: image = Image.open(uploaded_file) st.image(image, caption="Uploaded Image", use_column_width=True) if st.button("Search"): image_results, image_distances = image_search(image, image_index, image_metadata, image_model) st.subheader("Image Search Results") for result, distance in zip(image_results, image_distances): st.write(f"Time: {result['start']:.2f}s - {result['end']:.2f}s, Distance: {distance:.4f}") st.write("---") else: text_query = st.text_input("Enter a description of the image you're looking for") if st.button("Search"): image_results, image_distances = text_search(text_query, image_index, image_metadata, image_model) st.subheader("Image Search Results") for result, distance in zip(image_results, image_distances): st.write(f"Time: {result['start']:.2f}s - {result['end']:.2f}s, Distance: {distance:.4f}") st.write("---") # Display transcription st.header("Video Transcription") st.write(transcription['transcription'])