Spaces:
Sleeping
Sleeping
Create tweet_analyzer.py
Browse files- tweet_analyzer.py +166 -0
tweet_analyzer.py
ADDED
@@ -0,0 +1,166 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import os
|
2 |
+
from PyPDF2 import PdfReader
|
3 |
+
import pandas as pd
|
4 |
+
from dotenv import load_dotenv
|
5 |
+
from transformers import GPT2LMHeadModel, GPT2Tokenizer
|
6 |
+
import json
|
7 |
+
from datetime import datetime
|
8 |
+
from sklearn.decomposition import NMF
|
9 |
+
from sklearn.feature_extraction.text import TfidfVectorizer
|
10 |
+
from sklearn.cluster import KMeans
|
11 |
+
import random
|
12 |
+
from joblib import Parallel, delayed
|
13 |
+
|
14 |
+
class TweetDatasetProcessor:
|
15 |
+
def __init__(self):
|
16 |
+
load_dotenv()
|
17 |
+
# Load the fine-tuned GPT model and tokenizer
|
18 |
+
self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2') # Use your fine-tuned model path here
|
19 |
+
self.model = GPT2LMHeadModel.from_pretrained('path_to_finetuned_model') # Path to your fine-tuned model
|
20 |
+
self.tweets = []
|
21 |
+
self.personality_profile = ""
|
22 |
+
self.vectorizer = TfidfVectorizer(stop_words='english')
|
23 |
+
self.used_tweets = set() # Track used tweets to avoid repetition
|
24 |
+
|
25 |
+
@staticmethod
|
26 |
+
def _process_line(line):
|
27 |
+
"""Process a single line."""
|
28 |
+
line = line.strip()
|
29 |
+
if not line or line.startswith('http'): # Skip empty lines and URLs
|
30 |
+
return None
|
31 |
+
return {
|
32 |
+
'content': line,
|
33 |
+
'timestamp': datetime.now(),
|
34 |
+
'mentions': [word for word in line.split() if word.startswith('@')],
|
35 |
+
'hashtags': [word for word in line.split() if word.startswith('#')]
|
36 |
+
}
|
37 |
+
|
38 |
+
def extract_text_from_pdf(self, pdf_path):
|
39 |
+
"""Extract text content from PDF file."""
|
40 |
+
reader = PdfReader(pdf_path)
|
41 |
+
text = ""
|
42 |
+
for page in reader.pages:
|
43 |
+
text += page.extract_text()
|
44 |
+
return text
|
45 |
+
|
46 |
+
def process_pdf_content(self, text):
|
47 |
+
"""Process PDF content and clean extracted tweets."""
|
48 |
+
if not text.strip():
|
49 |
+
raise ValueError("The uploaded PDF appears to be empty.")
|
50 |
+
|
51 |
+
lines = text.split('\n')
|
52 |
+
# Pass the static method explicitly
|
53 |
+
clean_tweets = Parallel(n_jobs=-1)(delayed(TweetDatasetProcessor._process_line)(line) for line in lines)
|
54 |
+
self.tweets = [tweet for tweet in clean_tweets if tweet]
|
55 |
+
|
56 |
+
if not self.tweets:
|
57 |
+
raise ValueError("No tweets were extracted from the PDF. Ensure the content is properly formatted.")
|
58 |
+
|
59 |
+
# Save the processed tweets to a CSV
|
60 |
+
df = pd.DataFrame(self.tweets)
|
61 |
+
df.to_csv('processed_tweets.csv', index=False)
|
62 |
+
return df
|
63 |
+
|
64 |
+
def _extract_mentions(self, text):
|
65 |
+
"""Extract mentioned users from tweet."""
|
66 |
+
return [word for word in text.split() if word.startswith('@')]
|
67 |
+
|
68 |
+
def _extract_hashtags(self, text):
|
69 |
+
"""Extract hashtags from tweet."""
|
70 |
+
return [word for word in text.split() if word.startswith('#')]
|
71 |
+
|
72 |
+
def categorize_tweets(self):
|
73 |
+
"""Cluster tweets into categories using KMeans."""
|
74 |
+
all_tweets = [tweet['content'] for tweet in self.tweets]
|
75 |
+
if not all_tweets:
|
76 |
+
raise ValueError("No tweets available for clustering.")
|
77 |
+
|
78 |
+
tfidf_matrix = self.vectorizer.fit_transform(all_tweets)
|
79 |
+
kmeans = KMeans(n_clusters=5, random_state=1)
|
80 |
+
kmeans.fit(tfidf_matrix)
|
81 |
+
|
82 |
+
for i, tweet in enumerate(self.tweets):
|
83 |
+
tweet['category'] = f"Category {kmeans.labels_[i]}"
|
84 |
+
return pd.DataFrame(self.tweets)
|
85 |
+
|
86 |
+
def analyze_personality(self, max_tweets=50):
|
87 |
+
"""Comprehensive personality analysis using a limited subset of tweets."""
|
88 |
+
if not self.tweets:
|
89 |
+
raise ValueError("No tweets available for personality analysis.")
|
90 |
+
|
91 |
+
all_tweets = [tweet['content'] for tweet in self.tweets][:max_tweets]
|
92 |
+
analysis_prompt = f"""Perform a deep psychological analysis of the author based on these tweets:
|
93 |
+
Core beliefs, emotional tendencies, cognitive patterns, etc.
|
94 |
+
Tweets for analysis:
|
95 |
+
{json.dumps(all_tweets, indent=2)}
|
96 |
+
"""
|
97 |
+
# Prepare input for the fine-tuned model
|
98 |
+
inputs = self.tokenizer(analysis_prompt, return_tensors="pt", truncation=True, padding=True, max_length=512)
|
99 |
+
|
100 |
+
try:
|
101 |
+
# Generate response using the fine-tuned model
|
102 |
+
outputs = self.model.generate(inputs['input_ids'], max_length=500)
|
103 |
+
self.personality_profile = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
|
104 |
+
return self.personality_profile
|
105 |
+
except Exception as e:
|
106 |
+
return f"Error during personality analysis: {str(e)}"
|
107 |
+
|
108 |
+
def analyze_topics(self, n_topics=None):
|
109 |
+
"""Extract and identify different topics the author has tweeted about."""
|
110 |
+
all_tweets = [tweet['content'] for tweet in self.tweets]
|
111 |
+
if not all_tweets:
|
112 |
+
return []
|
113 |
+
|
114 |
+
n_topics = n_topics or min(5, len(all_tweets) // 10)
|
115 |
+
tfidf_matrix = self.vectorizer.fit_transform(all_tweets)
|
116 |
+
nmf_model = NMF(n_components=n_topics, random_state=1)
|
117 |
+
nmf_model.fit(tfidf_matrix)
|
118 |
+
|
119 |
+
topics = []
|
120 |
+
for topic_idx, topic in enumerate(nmf_model.components_):
|
121 |
+
topic_words = [self.vectorizer.get_feature_names_out()[i] for i in topic.argsort()[:-n_topics - 1:-1]]
|
122 |
+
topics.append(" ".join(topic_words))
|
123 |
+
return list(set(topics)) # Remove duplicates
|
124 |
+
|
125 |
+
def count_tokens(self, text):
|
126 |
+
"""Estimate the number of tokens in the given text."""
|
127 |
+
return len(text.split())
|
128 |
+
|
129 |
+
def generate_tweet(self, context="", sample_size=3):
|
130 |
+
"""Generate a new tweet by sampling random tweets and avoiding repetition."""
|
131 |
+
if not self.tweets:
|
132 |
+
return "Error: No tweets available for generation."
|
133 |
+
|
134 |
+
# Randomly sample unique tweets
|
135 |
+
available_tweets = [tweet for tweet in self.tweets if tweet['content'] not in self.used_tweets]
|
136 |
+
if len(available_tweets) < sample_size:
|
137 |
+
self.used_tweets.clear() # Reset used tweets if all have been used
|
138 |
+
available_tweets = self.tweets
|
139 |
+
|
140 |
+
sampled_tweets = random.sample(available_tweets, sample_size)
|
141 |
+
sampled_contents = [tweet['content'] for tweet in sampled_tweets]
|
142 |
+
|
143 |
+
# Update the used tweets tracker
|
144 |
+
self.used_tweets.update(sampled_contents)
|
145 |
+
|
146 |
+
# Truncate personality profile to avoid token overflow
|
147 |
+
personality_profile_excerpt = self.personality_profile[:400] if len(self.personality_profile) > 400 else self.personality_profile
|
148 |
+
|
149 |
+
# Construct the prompt
|
150 |
+
prompt = f"""Based on this personality profile:
|
151 |
+
{personality_profile_excerpt}
|
152 |
+
Current context or topic (if any):
|
153 |
+
{context}
|
154 |
+
Tweets for context:
|
155 |
+
{', '.join(sampled_contents)}
|
156 |
+
**Only generate the tweet. Do not include analysis, explanation, or any other content.**
|
157 |
+
"""
|
158 |
+
inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, padding=True, max_length=512)
|
159 |
+
|
160 |
+
try:
|
161 |
+
# Generate tweet using the fine-tuned model
|
162 |
+
outputs = self.model.generate(inputs['input_ids'], max_length=150)
|
163 |
+
tweet = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
|
164 |
+
return tweet
|
165 |
+
except Exception as e:
|
166 |
+
return f"Error generating tweet: {str(e)}"
|