import streamlit as st import pandas as pd import numpy as np import tempfile import os from dejan.veczip import veczip import csv import ast from huggingface_hub import hf_hub_download, HfApi from transformers import AutoTokenizer, AutoModel import torch # Function definitions (is_numeric, parse_as_array, get_line_pattern, detect_header, looks_like_id_column, detect_columns, load_and_validate_embeddings, save_compressed_embeddings, run_veczip - same as before) # ----------------- def is_numeric(s): """Checks if a given string is numeric.""" try: float(s) return True except: return False def parse_as_array(val): """Parses a string as an array of numbers.""" if isinstance(val, (int, float)): return [val] val_str = str(val).strip() if val_str.startswith("[") and val_str.endswith("]"): try: arr = ast.literal_eval(val_str) if isinstance(arr, list) and all(is_numeric(str(x)) for x in arr): return arr return None except: return None parts = val_str.split(",") if len(parts) > 1 and all(is_numeric(p.strip()) for p in parts): return [float(p.strip()) for p in parts] return None def get_line_pattern(row): """Detects the pattern (text, number, or array) of a row.""" pattern = [] for val in row: arr = parse_as_array(val) if arr is not None: pattern.append('arr') else: if is_numeric(val): pattern.append('num') else: pattern.append('text') return pattern def detect_header(lines): """Detects if a CSV has a header.""" if len(lines) < 2: return False first_line_pattern = get_line_pattern(lines[0]) subsequent_patterns = [get_line_pattern(r) for r in lines[1:]] if len(subsequent_patterns) > 1: if all(p == subsequent_patterns[0] for p in subsequent_patterns) and first_line_pattern != subsequent_patterns[0]: return True else: if subsequent_patterns and first_line_pattern != subsequent_patterns[0]: return True return False def looks_like_id_column(col_values): """Checks if a column looks like an ID column (sequential integers).""" try: nums = [int(float(v)) for v in col_values] return nums == list(range(nums[0], nums[0] + len(nums))) except: return False def detect_columns(file_path): """Detects embedding and metadata columns in a CSV file.""" with open(file_path, "r", newline="", encoding="utf-8") as f: try: sample = f.read(1024*10) # Read a larger sample for sniffing dialect = csv.Sniffer().sniff(sample, delimiters=[',','\t',';','|']) delimiter = dialect.delimiter except: delimiter = ',' f.seek(0) # reset file pointer reader = csv.reader(f, delimiter=delimiter) first_lines = list(reader)[:10] if not first_lines: raise ValueError("No data") has_header = detect_header(first_lines) if has_header: header = first_lines[0] data = first_lines[1:] else: header = [] data = first_lines if not data: return has_header, [], [], delimiter cols = list(zip(*data)) candidate_arrays = [] candidate_numeric = [] id_like_columns = set() text_like_columns = set() for ci, col in enumerate(cols): col = list(col) parsed_rows = [parse_as_array(val) for val in col] if all(r is not None for r in parsed_rows): lengths = {len(r) for r in parsed_rows} if len(lengths) == 1: candidate_arrays.append(ci) continue else: text_like_columns.add(ci) continue if all(is_numeric(v) for v in col): if looks_like_id_column(col): id_like_columns.add(ci) else: candidate_numeric.append(ci) else: text_like_columns.add(ci) identified_embedding_columns = set(candidate_arrays) identified_metadata_columns = set() if candidate_arrays: identified_metadata_columns.update(candidate_numeric) else: if len(candidate_numeric) > 1: identified_embedding_columns.update(candidate_numeric) else: identified_metadata_columns.update(candidate_numeric) identified_metadata_columns.update(id_like_columns) identified_metadata_columns.update(text_like_columns) if header: for ci, col_name in enumerate(header): if col_name.lower() == 'id': if ci in identified_embedding_columns: identified_embedding_columns.remove(ci) identified_metadata_columns.add(ci) break emb_cols = [header[i] if header and i < len(header) else i for i in identified_embedding_columns] meta_cols = [header[i] if header and i < len(header) else i for i in identified_metadata_columns] return has_header, emb_cols, meta_cols, delimiter def load_and_validate_embeddings(input_file, target_dims): """Loads, validates, and summarizes embedding data from a CSV.""" print(f"Loading data from {input_file}...") has_header, embedding_columns, metadata_columns, delimiter = detect_columns(input_file) data = pd.read_csv(input_file, header=0 if has_header else None, delimiter=delimiter) def is_valid_row(row): for col in embedding_columns: if parse_as_array(row[col]) is None: return False return True valid_rows_filter = data.apply(is_valid_row, axis=1) data = data[valid_rows_filter] print("\n=== File Summary ===") print(f"File: {input_file}") print(f"Rows: {len(data)}") print(f"Metadata Columns: {metadata_columns}") print(f"Embedding Columns: {embedding_columns}") print("====================\n") return data, embedding_columns, metadata_columns, has_header, list(data.columns) def save_compressed_embeddings(output_file, metadata, compressed_embeddings, embedding_columns, original_columns, has_header): """Saves compressed embeddings to a CSV file.""" print(f"Saving compressed data to {output_file}...") metadata = metadata.copy() for i, col in enumerate(embedding_columns): metadata[col] = [compressed_embeddings[i][j].tolist() for j in range(compressed_embeddings[i].shape[0])] header_option = True if has_header else False final_df = metadata.reindex(columns=original_columns) if original_columns else metadata final_df.to_csv(output_file, index=False, header=header_option) print(f"Data saved to {output_file}.") def run_veczip(input_file, target_dims=16): """Runs veczip compression on the input data.""" data, embedding_columns, metadata_columns, has_header, original_columns = load_and_validate_embeddings(input_file, target_dims) all_embeddings = [] for col in embedding_columns: embeddings = np.array([parse_as_array(x) for x in data[col].values]) all_embeddings.append(embeddings) combined_embeddings = np.concatenate(all_embeddings, axis=0) compressor = veczip(target_dims=target_dims) retained_indices = compressor.compress(combined_embeddings) compressed_embeddings = [] for embeddings in all_embeddings: compressed_embeddings.append(embeddings[:, retained_indices]) temp_output = tempfile.NamedTemporaryFile(suffix='.csv', delete=False) save_compressed_embeddings(temp_output.name, data[metadata_columns], compressed_embeddings, embedding_columns, original_columns, has_header) return temp_output.name # ----------------- # Embedding Generation Function @st.cache_resource def load_embedding_model(model_name="mixedbread-ai/mxbai-embed-large-v1"): """Loads the embedding model and tokenizer.""" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModel.from_pretrained(model_name) return tokenizer, model @st.cache_data def generate_embeddings(_tokenizer, _model, text_list): """Generates embeddings for a list of text entries.""" encoded_input = _tokenizer( text_list, padding=True, truncation=True, return_tensors="pt" ) with torch.no_grad(): model_output = _model(**encoded_input) embeddings = model_output.last_hidden_state.mean(dim=1) return embeddings.cpu().numpy() # Streamlit App def main(): st.title("Veczip Embeddings Tool") st.markdown( """ This tool offers two ways to compress your embeddings: 1. **Compress Your Embeddings:** Upload a CSV file containing pre-existing embeddings and reduce their dimensionality using `dejan.veczip`. 2. **Generate & Compress Embeddings:** Provide a list of text entries, and this tool will generate embeddings using `mxbai-embed-large-v1` and then compress them. """ ) st.markdown( """ **General Usage Guide** * Both tools work best with larger datasets (hundreds or thousands of entries). * For CSV files with embeddings, ensure that numeric embedding columns are parsed as arrays (e.g. '[1,2,3]' or '1,2,3') and metadata columns are parsed as text or numbers. * Output files are compressed to 16 dimensions. """ ) tab1, tab2 = st.tabs(["Compress Your Embeddings", "Generate & Compress Embeddings"]) with tab1: st.header("Compress Your Embeddings") st.markdown( """ Upload a CSV file containing pre-existing embeddings. This will reduce the dimensionality of the embeddings to 16 dimensions using `dejan.veczip`. """ ) uploaded_file = st.file_uploader( "Upload CSV file with embeddings", type=["csv"], help="Ensure the CSV file has columns where embedding arrays are represented as text. Examples: '[1,2,3]' or '1,2,3'", ) if uploaded_file: try: with st.spinner("Analyzing and compressing embeddings..."): temp_file = tempfile.NamedTemporaryFile(delete=False) temp_file.write(uploaded_file.read()) temp_file.close() output_file_path = run_veczip(temp_file.name) with open(output_file_path, 'rb') as f: st.download_button( label="Download Compressed CSV", data=f, file_name="compressed_embeddings.csv", mime="text/csv" ) os.unlink(temp_file.name) os.unlink(output_file_path) st.success("Compression complete! Download your compressed file below.") except Exception as e: st.error(f"Error processing file: {e}") with tab2: st.header("Generate & Compress Embeddings") st.markdown( """ Provide a list of text entries (one per line), and this tool will: 1. Generate embeddings using `mixedbread-ai/mxbai-embed-large-v1`. 2. Compress those embeddings to 16 dimensions using `dejan.veczip`. """ ) text_input = st.text_area( "Enter text entries (one per line)", help="Enter each text entry on a new line. This tool works best with a large sample size.", ) generate_button = st.button("Generate and Compress") if generate_button and text_input: text_list = text_input.strip().split("\n") if len(text_list) == 0: st.warning("Please enter some text for embedding") else: try: with st.spinner("Generating and compressing embeddings..."): tokenizer, model = load_embedding_model() embeddings = generate_embeddings(tokenizer, model, text_list) compressor = veczip(target_dims=16) retained_indices = compressor.compress(embeddings) compressed_embeddings = embeddings[:, retained_indices] df = pd.DataFrame( {"text": text_list, "embeddings": compressed_embeddings.tolist()} ) st.dataframe(df) csv_file = df.to_csv(index=False).encode() st.download_button( label="Download Compressed Embeddings (CSV)", data=csv_file, file_name="generated_compressed_embeddings.csv", mime="text/csv", ) st.success("Generated and compressed! Download your file below.") except Exception as e: st.error(f"Error: {e}") if __name__ == "__main__": main()