import gradio as gr # type: ignore import pandas as pd import re import spacy # type: ignore from sklearn.cluster import KMeans from sklearn.metrics.pairwise import cosine_similarity from sklearn.feature_extraction.text import TfidfVectorizer from sentence_transformers import SentenceTransformer, util # type: ignore from transformers import pipeline, AutoTokenizer import textstat # type: ignore sentiment_analyzer = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english") tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english") nlp = spacy.load("en_core_web_sm") model = SentenceTransformer('all-MiniLM-L6-v2') weights = { "information_density": 0.2, "unique_key_points": 0.8, "strength_word_count": 0.002, "weakness_word_count": 0.004, "discussion_word_count": 0.01 } THRESHOLDS = { "normalized_length": (0.15, 0.25), "unique_key_points": (3, 10), "information_density": (0.01, 0.02), "unique_insights_per_word": 0.002, "optimization_score": 0.7, "composite_score": 5, "adjusted_argument_strength": 0.75 } def chunk_text(text, max_length): tokens = tokenizer(text, return_tensors="pt", truncation=False)["input_ids"].squeeze(0).tolist() return [tokenizer.decode(tokens[i:i+max_length]) for i in range(0, len(tokens), max_length)] def analyze_text(texts): results = [] for text in texts: chunks = chunk_text(text, max_length=200) chunk_results = sentiment_analyzer(chunks) overall_sentiment = { "label": "POSITIVE" if sum(1 for res in chunk_results if res["label"] == "POSITIVE") >= len(chunk_results) / 2 else "NEGATIVE", "score": sum(res["score"] for res in chunk_results) / len(chunk_results), } results.append(overall_sentiment) return results def word_count(text): return len(text.split()) if isinstance(text, str) else 0 def count_citations(text): doc = nlp(text) return sum(1 for ent in doc.ents if ent.label_ in ['WORK_OF_ART', 'ORG', 'GPE']) def calculate_unique_insights_per_word(text): sentences = text.split('.') tfidf = TfidfVectorizer().fit_transform(sentences) similarities = cosine_similarity(tfidf) avg_similarity = (similarities.sum() - len(sentences)) / (len(sentences)**2 - len(sentences)) return 1 - avg_similarity def calculate_unique_key_points_and_density(texts): unique_key_points = [] information_density = [] for text in texts: if not isinstance(text, str) or text.strip() == "": unique_key_points.append(0) information_density.append(0) continue doc = nlp(text) sentences = [sent.text for sent in doc.sents] embeddings = model.encode(sentences) n_clusters = max(1, len(sentences) // 5) kmeans = KMeans(n_clusters=n_clusters, random_state=42) kmeans.fit(embeddings) cluster_centers = kmeans.cluster_centers_ unique_points_count = len(cluster_centers) word_count = len(text.split()) density = unique_points_count / word_count if word_count > 0 else 0 unique_key_points.append(unique_points_count) information_density.append(density) return unique_key_points, information_density def segment_comments(comments): if comments == "N/A": return {"strengths": "", "weaknesses": "", "general_discussion": ""} strengths = re.search(r"- Strengths:\n([\s\S]*?)(\n- Weaknesses:|\Z)", comments) weaknesses = re.search(r"- Weaknesses:\n([\s\S]*?)(\n- General Discussion:|\Z)", comments) general_discussion = re.search(r"- General Discussion:\n([\s\S]*?)\Z", comments) return { "strengths": strengths.group(1).strip() if strengths else "", "weaknesses": weaknesses.group(1).strip() if weaknesses else "", "general_discussion": general_discussion.group(1).strip() if general_discussion else "" } def preprocess(comment, abstract): df = pd.DataFrame({"comments": [comment]}) abstracts = pd.DataFrame({"abstract": [abstract]}) segmented_reviews = df["comments"].apply(segment_comments) df["strengths"] = segmented_reviews.apply(lambda x: x["strengths"]) df["weaknesses"] = segmented_reviews.apply(lambda x: x["weaknesses"]) df["general_discussion"] = segmented_reviews.apply(lambda x: x["general_discussion"]) comments_embeddings = model.encode(df['comments'].tolist(), convert_to_tensor=True) abstract_embeddings = model.encode(abstracts["abstract"].tolist(), convert_to_tensor=True) df['content_relevance'] = util.cos_sim(comments_embeddings, abstract_embeddings).diagonal() df['evidence_support'] = df['comments'].apply(count_citations) df['strengths'] = df['strengths'].fillna('').astype(str) texts = df['strengths'].tolist() results = analyze_text(texts) df['strength_argument_score'] = [result['score'] for result in results] df['weaknesses'] = df['weaknesses'].fillna('').astype(str) texts = df['weaknesses'].tolist() results = analyze_text(texts) df['weakness_argument_score'] = [result['score'] for result in results] df['argument_strength'] = (df['strength_argument_score'] + df['weakness_argument_score']) / 2 df['readability_index'] = df['comments'].apply(textstat.flesch_reading_ease) df['sentence_complexity'] = df['comments'].apply(textstat.sentence_count) df['technical_depth'] = df['readability_index'] / df['sentence_complexity'] df['total_word_count'] = df['comments'].apply(word_count) df['strength_word_count'] = df['strengths'].apply(word_count) df['weakness_word_count'] = df['weaknesses'].apply(word_count) df['discussion_word_count'] = df['general_discussion'].apply(word_count) average_length = df['total_word_count'].mean() df['normalized_length'] = df['total_word_count'] / average_length df["unique_key_points"], df["information_density"] = calculate_unique_key_points_and_density(df["comments"]) df['unique_insights_per_word'] = df['comments'].apply(calculate_unique_insights_per_word) / df['total_word_count'] return df def calculate_composite_score(df): df['composite_score'] = ( weights['information_density'] * df['information_density'] + weights['unique_key_points'] * df['unique_key_points'] + weights['strength_word_count'] * df['strength_word_count'] + weights['weakness_word_count'] * df['weakness_word_count'] + weights['discussion_word_count'] * df['discussion_word_count'] ) return df def classify_review_quality(row): if row['composite_score'] > 12: return 'Excellent Review Quality' elif row['composite_score'] < 3: return 'Poor Review Quality' else: return 'Moderate Review Quality' def determine_review_quality(df): df['normalized_length'] = df['total_word_count'] / df['total_word_count'].max() df['unique_insights_per_word'] = df['unique_key_points'] / df['normalized_length'] df['adjusted_argument_strength'] = df['argument_strength'] / (1 + df['sentence_complexity']) df['review_quality'] = df.apply(classify_review_quality, axis=1) return df def heuristic_optimization(row): suggestions = [] if row["strength_word_count"] > 100 and row["strength_argument_score"] < THRESHOLDS["adjusted_argument_strength"]: suggestions.append("Summarize redundant strengths.") elif row["strength_word_count"] < 50 and row["strength_argument_score"] < THRESHOLDS["adjusted_argument_strength"]: suggestions.append("Add more impactful strengths.") if row["weakness_word_count"] > 100 and row["weakness_argument_score"] < THRESHOLDS["adjusted_argument_strength"]: suggestions.append("Remove repetitive criticisms.") elif row["weakness_word_count"] < 50 and row["weakness_argument_score"] < THRESHOLDS["adjusted_argument_strength"]: suggestions.append("Add specific, actionable weaknesses.") if row["discussion_word_count"] < 100 and row["information_density"] < THRESHOLDS["information_density"][0]: suggestions.append("Elaborate with new insights or examples.") elif row["discussion_word_count"] > 300 and row["information_density"] > THRESHOLDS["information_density"][1]: suggestions.append("Summarize key discussion points.") if row["normalized_length"] < THRESHOLDS["normalized_length"][0]: suggestions.append("Expand sections for better coverage.") elif row["normalized_length"] > THRESHOLDS["normalized_length"][1]: suggestions.append("Condense content to improve readability.") if row["unique_key_points"] < THRESHOLDS["unique_key_points"][0]: suggestions.append("Add more unique insights.") elif row["unique_key_points"] > THRESHOLDS["unique_key_points"][1]: suggestions.append("Streamline ideas for clarity.") if row["composite_score"] < THRESHOLDS["composite_score"]: suggestions.append("Enhance clarity, evidence, and argumentation.") if row["review_quality"] == "Low": suggestions.append("Significant revisions required.") elif row["review_quality"] == "Moderate": suggestions.append("Minor refinements recommended.") return suggestions def pipeline(comment, abstract): df = preprocess(comment, abstract) df = calculate_composite_score(df) df = determine_review_quality(df) df["optimization_suggestions"] = df.apply(heuristic_optimization, axis=1) return df["review_quality"][0], " ".join(df["optimization_suggestions"][0]) with gr.Blocks() as demo: gr.Markdown("# Dynaic Length Optimization of Peer Review") with gr.Row(): comment = gr.Textbox(label="Peer Review Comments") abstract = gr.Textbox(label="Paper Abstract") review_quality = gr.Textbox(label="Predicted Review Quality") suggestions = gr.Textbox(label="Suggestions") comment.change(fn=pipeline, inputs=[comment, abstract], outputs=[review_quality, suggestions]) iface = gr.Interface( fn=pipeline, inputs=[gr.Textbox(label="Peer Review Comments"), gr.Textbox(label="Paper Abstract")], outputs=[gr.Textbox(label="Predicted Review Quality"), gr.Textbox(label="Suggestions")], title="# Dynamic Length Optimization of Peer Review", description="A framework which dynamically provides suggestion to improve a peer review.", ) if __name__ == "__main__": iface.launch()