import numpy as np |
import pandas as pd |
from sklearn.model_selection import train_test_split |
from sklearn.ensemble import RandomForestClassifier |
from sklearn.metrics import accuracy_score |
import joblib |
import cv2 |
from deepface import DeepFace |
import random |
import gradio as gr |
import matplotlib.pyplot as plt |
from transformers import pipeline |
from PIL import Image |
from ntscraper import Nitter |
import csv |
def video_sentiment_score(sentiment): |
sentiment_mapping = { |
"happy": 1.0, |
"sad": -1.0, |
"angry": -1.5, |
"surprised": 0.5, |
"neutral": 0.0 |
} |
return sentiment_mapping.get(sentiment, 0.0) |
def text_emotion_score(emotion): |
emotion_mapping = { |
"joy": 1.0, |
"sadness": -1.0, |
"anger": -1.5, |
"surprise": 0.5, |
"neutral": 0.0, |
"disgust": -1.5, |
"fear": -1.0 |
} |
return emotion_mapping.get(emotion, 0.0) |
def environment_score(environment): |
environment_mapping = { |
"Good": 1.0, |
"Moderate": 0.0, |
"Bad": -1.0 |
} |
return environment_mapping.get(environment, 0.0) |
def scrape_tweets(hashtag, mode, num_of_tweets, since_date, until_date): |
print(f"num_of_tweets before conversion: {num_of_tweets}") |
num_of_tweets = int(num_of_tweets) |
import httpx |
httpx._config.DEFAULT_TIMEOUT = httpx.Timeout(3.0) |
scraper = Nitter() |
tweets = scraper.get_tweets( |
hashtag, |
mode='hashtag', |
number=num_of_tweets, |
since=since_date, |
until=until_date |
) |
final_tweets = [] |
with open('tweets_kuru.csv', 'w', encoding='utf-8') as file: |
writer = csv.writer(file) |
writer.writerow([f'Scraping Tweets for #{hashtag}']) |
writer.writerow(['User', 'Username', 'Tweet', 'Date']) |
for tweet in tweets['tweets']: |
tweet_details = [tweet['user']['name'], tweet['user']['username'], tweet['text'], tweet['date']] |
writer.writerow([tweet['user']['name'], tweet['user']['username'], tweet['text'], tweet['date']]) |
final_tweets.append(tweet_details) |
tweet_df = pd.DataFrame(final_tweets, columns=['User', 'Username', 'Tweet', 'Date']) |
return tweet_df |
np.random.seed(42) |
data_size = 1000 |
aqi_values = np.random.randint(0, 500, size=data_size) |
noise_levels = np.random.randint(30, 110, size=data_size) |
temperatures = np.random.randint(-10, 40, size=data_size) |
humidity_levels = np.random.randint(10, 90, size=data_size) |
pm25_values = np.random.randint(0, 500, size=data_size) |
co2_levels = np.random.randint(250, 6000, size=data_size) |
def classify_environment(aqi, noise, temp, humidity, pm25, co2): |
if aqi > 150 or noise > 80 or temp > 35 or humidity > 80 or pm25 > 55 or co2 > 2000: |
return "Bad" |
elif aqi > 100 or noise > 60 or temp > 30 or humidity > 60 or pm25 > 35 or co2 > 1000: |
return "Moderate" |
else: |
return "Good" |
labels = [classify_environment(aqi, noise, temp, humidity, pm25, co2) |
for aqi, noise, temp, humidity, pm25, co2 in |
zip(aqi_values, noise_levels, temperatures, humidity_levels, pm25_values, co2_levels)] |
data = pd.DataFrame({ |
'AQI': aqi_values, |
'Noise': noise_levels, |
'Temperature': temperatures, |
'Humidity': humidity_levels, |
'PM2.5': pm25_values, |
'CO2': co2_levels, |
'Label': labels |
}) |
X = data[['AQI', 'Noise', 'Temperature', 'Humidity', 'PM2.5', 'CO2']] |
y = data['Label'] |
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) |
model = RandomForestClassifier() |
model.fit(X_train, y_train) |
predictions = model.predict(X_test) |
print("Accuracy:", accuracy_score(y_test, predictions)) |
joblib.dump(model, 'environment_model.pkl') |
def analyze_video_sentiment(video_path, num_frames=10, detector_backend='retinaface'): |
cap = cv2.VideoCapture(video_path) |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
frame_indices = random.sample(range(total_frames), num_frames) |
emotions = {"happy": 0, "sad": 0, "angry": 0, "surprised": 0, "neutral": 0} |
frame_images = [] |
for idx in frame_indices: |
cap.set(cv2.CAP_PROP_POS_FRAMES, idx) |
ret, frame = cap.read() |
if not ret: |
continue |
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
try: |
results = DeepFace.analyze(rgb_frame, actions=["emotion"], enforce_detection=True, |
detector_backend=detector_backend) |
for result in results: |
if result is None or result == {}: |
continue |
face_coordinates = result["region"] |
x1, y1, x2, y2 = face_coordinates["x"], face_coordinates["y"], face_coordinates["x"] + face_coordinates[ |
"w"], face_coordinates["y"] + face_coordinates["h"] |
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) |
dominant_emotion = result["dominant_emotion"] |
cv2.putText(frame, dominant_emotion, (x1 + 5, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) |
if dominant_emotion in emotions: |
emotions[dominant_emotion] += 1 |
except ValueError as e: |
if "No face detected" in str(e): |
continue |
else: |
raise e |
frame_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
frame_images.append(frame_image) |
cap.release() |
cv2.destroyAllWindows() |
dominant_emotion = max(emotions, key=emotions.get) |
return dominant_emotion, frame_images |
classifier = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base", |
return_all_scores=True) |
def classify_tweets(tweets_df): |
if tweets_df.empty: |
return "No tweets to analyze." |
tweet_texts = tweets_df['Tweet'].tolist() |
results = classifier(tweet_texts) |
emotions = [max(result, key=lambda x: x['score'])['label'] for result in results] |
tweet_df = tweets_df.copy() |
tweet_df['Sentiment'] = emotions |
sentiment_counts = tweet_df['Sentiment'].value_counts() |
fig, ax = plt.subplots(figsize=(8, 5)) |
bars = ax.bar(sentiment_counts.index, sentiment_counts.values, |
color=['#FF6F61', '#6B5B95', '#88B04B', '#F7CAC9', '#92A8D1', '#955251']) |
ax.set_xlabel('Sentiment', fontsize=14, fontweight='bold', color='#34495E') |
ax.set_ylabel('Count', fontsize=14, fontweight='bold', color='#34495E') |
ax.set_title('Tweet Sentiment Distribution', fontsize=18, fontweight='bold', color='#2E4053') |
ax.tick_params(axis='x', rotation=0, colors='#34495E', labelsize=12) |
ax.tick_params(axis='y', colors='#34495E', labelsize=12) |
ax.spines['top'].set_visible(False) |
ax.spines['right'].set_visible(False) |
ax.spines['left'].set_visible(False) |
ax.spines['bottom'].set_visible(False) |
ax.yaxis.grid(True, linestyle='--', which='major', color='grey', alpha=.45) |
ax.xaxis.set_tick_params(width=0) |
for bar in bars: |
yval = bar.get_height() |
ax.text(bar.get_x() + bar.get_width() / 2, yval + 0.01, round(yval, 2), ha='center', va='bottom', |
color='#34495E', fontsize=12, fontweight='bold') |
plt.tight_layout() |
return tweet_df, fig |
def classify_and_plot(hashtag, mode, num_of_tweets, since_date, until_date): |
tweet_df = scrape_tweets(hashtag, mode, num_of_tweets, since_date, until_date) |
tweet_df, fig = classify_tweets(tweet_df) |
return tweet_df, fig |
def classify_overall_sentiment(video, hashtag, mode, num_of_tweets, since_date, until_date, aqi, noise, temp, |
humidity, pm25, co2): |
video_sentiment, frame_images = analyze_video_sentiment(video) |
tweet_df, plot = classify_and_plot(hashtag, mode, num_of_tweets, since_date, until_date) |
text_emotion = tweet_df['Sentiment'].value_counts().idxmax() if not tweet_df.empty else "No text analyzed" |
environment = classify_environment(aqi, noise, temp, humidity, pm25, co2) |
overall_sentiment = f"Video Sentiment: {video_sentiment}, Environment Sentiment: {environment}, Text Emotion: {text_emotion}" |
sentiments = ["Video Sentiment", "Text Emotion", "Environment Sentiment"] |
scores = [video_sentiment_score(video_sentiment), text_emotion_score(text_emotion), environment_score(environment)] |
fig, ax = plt.subplots() |
ax.plot(sentiments, scores, marker='o') |
ax.set_xlabel('Sentiment Source', fontsize=14, fontweight='bold', color='#34495E') |
ax.set_ylabel('Sentiment Score', fontsize=14, fontweight='bold', color='#34495E') |
ax.set_title('Overall Sentiment Scores', fontsize=18, fontweight='bold', color='#2E4053') |
plt.tight_layout() |
return overall_sentiment, frame_images, tweet_df, plot, fig |
example_video = "TimesSquare.mp4" |
video_interface = gr.Interface( |
fn=analyze_video_sentiment, |
inputs=[ |
gr.Video(value=example_video), |
gr.Slider(minimum=1, maximum=20, step=1), |
gr.Radio(["retinaface", "mtcnn", "opencv", "ssd", "dlib", "mediapipe"], label="Detector Backend", value="retinaface") |
], |
outputs=["text", gr.Gallery(label="Analyzed Frames")], |
title="Video Sentiment Analysis", |
) |
text_interface = gr.Interface( |
fn=classify_and_plot, |
inputs=[gr.Textbox(label="Hashtag"), |
gr.Radio(["latest", "top"], label="Mode"), |
gr.Slider(1, 1000, step=1, label="Number of Tweets"), |
gr.Textbox(label="Since Date (YYYY-MM-DD)"), |
gr.Textbox(label="Until Date (YYYY-MM-DD)")], |
outputs=[gr.DataFrame(label="Scraped Tweets"), gr.Plot()], |
title="Social Media Sentiment Analysis" |
) |
environment_interface = gr.Interface( |
fn=classify_environment, |
inputs=[gr.Slider(minimum=0, maximum=500, step=1, label="AQI"), |
gr.Slider(minimum=0, maximum=110, step=1, label="Noise"), |
gr.Slider(minimum=-10, maximum=50, step=1, label="Temperature"), |
gr.Slider(minimum=0, maximum=100, step=1, label="Humidity"), |
gr.Slider(minimum=0, maximum=500, step=1, label="PM2.5"), |
gr.Slider(minimum=250, maximum=6000, step=1, label="CO2")], |
outputs="text", |
title="Environment Sentiment Analysis" |
) |
overall_interface = gr.Interface( |
fn=classify_overall_sentiment, |
inputs=[gr.Video(), |
gr.Textbox(label="Hashtag"), gr.Radio(["latest", "top"], label="Mode"), |
gr.Slider(1, 1000, step=1, label="Number of Tweets"), |
gr.Textbox(label="Since Date (YYYY-MM-DD)"), gr.Textbox(label="Until Date (YYYY-MM-DD)"), |
gr.Slider(minimum=0, maximum=500, step=1, label="AQI"), |
gr.Slider(minimum=0, maximum=110, step=1, label="Noise"), |
gr.Slider(minimum=-10, maximum=50, step=1, label="Temperature"), |
gr.Slider(minimum=0, maximum=100, step=1, label="Humidity"), |
gr.Slider(minimum=0, maximum=500, step=1, label="PM2.5"), |
gr.Slider(minimum=250, maximum=6000, step=1, label="CO2")], |
outputs=["text", gr.Gallery(), gr.DataFrame(), gr.Plot(), gr.Plot()], |
title="Overall Sentiment Analysis" |
) |
scraper_interface = gr.Interface( |
fn=scrape_tweets, |
inputs=[gr.Textbox(label="Hashtag"), |
gr.Radio(["latest", "top"], label="Mode"), |
gr.Slider(1, 1000, step=1, label="Number of Tweets"), |
gr.Textbox(label="Since Date (YYYY-MM-DD)"), |
gr.Textbox(label="Until Date (YYYY-MM-DD)")], |
outputs=gr.DataFrame(), |
title="Scrape Tweets" |
) |
tabbed_interface = gr.TabbedInterface( |
[video_interface, text_interface, environment_interface, overall_interface, scraper_interface], |
["Video Sentiment Analysis", "Social Media Sentiment Analysis", "Environment Sentiment Analysis", |
"Overall Sentiment Analysis", "Scrape Tweets"]) |
tabbed_interface.launch(debug=True, share=True) |