|
import numpy as np |
|
import pandas as pd |
|
from sklearn.model_selection import train_test_split |
|
from sklearn.ensemble import RandomForestClassifier |
|
from sklearn.metrics import accuracy_score |
|
import joblib |
|
import cv2 |
|
from deepface import DeepFace |
|
import random |
|
import gradio as gr |
|
import matplotlib.pyplot as plt |
|
from transformers import pipeline |
|
from PIL import Image |
|
from ntscraper import Nitter |
|
import csv |
|
|
|
|
|
|
|
def video_sentiment_score(sentiment): |
|
sentiment_mapping = { |
|
"happy": 1.0, |
|
"sad": -1.0, |
|
"angry": -1.5, |
|
"surprised": 0.5, |
|
"neutral": 0.0 |
|
} |
|
return sentiment_mapping.get(sentiment, 0.0) |
|
|
|
def text_emotion_score(emotion): |
|
emotion_mapping = { |
|
"joy": 1.0, |
|
"sadness": -1.0, |
|
"anger": -1.5, |
|
"surprise": 0.5, |
|
"neutral": 0.0, |
|
"disgust": -1.5, |
|
"fear": -1.0 |
|
} |
|
return emotion_mapping.get(emotion, 0.0) |
|
|
|
def environment_score(environment): |
|
environment_mapping = { |
|
"Good": 1.0, |
|
"Moderate": 0.0, |
|
"Bad": -1.0 |
|
} |
|
return environment_mapping.get(environment, 0.0) |
|
|
|
|
|
def scrape_tweets(hashtag, mode, num_of_tweets, since_date, until_date): |
|
print(f"num_of_tweets before conversion: {num_of_tweets}") |
|
num_of_tweets = int(num_of_tweets) |
|
import httpx |
|
httpx._config.DEFAULT_TIMEOUT = httpx.Timeout(3.0) |
|
|
|
scraper = Nitter() |
|
|
|
tweets = scraper.get_tweets( |
|
hashtag, |
|
mode='hashtag', |
|
number=num_of_tweets, |
|
since=since_date, |
|
until=until_date |
|
) |
|
|
|
final_tweets = [] |
|
|
|
with open('tweets_kuru.csv', 'w', encoding='utf-8') as file: |
|
writer = csv.writer(file) |
|
writer.writerow([f'Scraping Tweets for #{hashtag}']) |
|
writer.writerow(['User', 'Username', 'Tweet', 'Date']) |
|
|
|
for tweet in tweets['tweets']: |
|
tweet_details = [tweet['user']['name'], tweet['user']['username'], tweet['text'], tweet['date']] |
|
writer.writerow([tweet['user']['name'], tweet['user']['username'], tweet['text'], tweet['date']]) |
|
final_tweets.append(tweet_details) |
|
|
|
tweet_df = pd.DataFrame(final_tweets, columns=['User', 'Username', 'Tweet', 'Date']) |
|
return tweet_df |
|
|
|
|
|
|
|
np.random.seed(42) |
|
data_size = 1000 |
|
aqi_values = np.random.randint(0, 500, size=data_size) |
|
noise_levels = np.random.randint(30, 110, size=data_size) |
|
temperatures = np.random.randint(-10, 40, size=data_size) |
|
humidity_levels = np.random.randint(10, 90, size=data_size) |
|
pm25_values = np.random.randint(0, 500, size=data_size) |
|
co2_levels = np.random.randint(250, 6000, size=data_size) |
|
|
|
|
|
def classify_environment(aqi, noise, temp, humidity, pm25, co2): |
|
if aqi > 150 or noise > 80 or temp > 35 or humidity > 80 or pm25 > 55 or co2 > 2000: |
|
return "Bad" |
|
elif aqi > 100 or noise > 60 or temp > 30 or humidity > 60 or pm25 > 35 or co2 > 1000: |
|
return "Moderate" |
|
else: |
|
return "Good" |
|
|
|
|
|
labels = [classify_environment(aqi, noise, temp, humidity, pm25, co2) |
|
for aqi, noise, temp, humidity, pm25, co2 in |
|
zip(aqi_values, noise_levels, temperatures, humidity_levels, pm25_values, co2_levels)] |
|
|
|
data = pd.DataFrame({ |
|
'AQI': aqi_values, |
|
'Noise': noise_levels, |
|
'Temperature': temperatures, |
|
'Humidity': humidity_levels, |
|
'PM2.5': pm25_values, |
|
'CO2': co2_levels, |
|
'Label': labels |
|
}) |
|
|
|
|
|
X = data[['AQI', 'Noise', 'Temperature', 'Humidity', 'PM2.5', 'CO2']] |
|
y = data['Label'] |
|
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) |
|
|
|
model = RandomForestClassifier() |
|
model.fit(X_train, y_train) |
|
|
|
predictions = model.predict(X_test) |
|
print("Accuracy:", accuracy_score(y_test, predictions)) |
|
|
|
|
|
joblib.dump(model, 'environment_model.pkl') |
|
|
|
|
|
|
|
def analyze_video_sentiment(video_path, num_frames=10, detector_backend='retinaface'): |
|
cap = cv2.VideoCapture(video_path) |
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
frame_indices = random.sample(range(total_frames), num_frames) |
|
emotions = {"happy": 0, "sad": 0, "angry": 0, "surprised": 0, "neutral": 0} |
|
frame_images = [] |
|
|
|
for idx in frame_indices: |
|
cap.set(cv2.CAP_PROP_POS_FRAMES, idx) |
|
ret, frame = cap.read() |
|
if not ret: |
|
continue |
|
|
|
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
try: |
|
results = DeepFace.analyze(rgb_frame, actions=["emotion"], enforce_detection=True, |
|
detector_backend=detector_backend) |
|
|
|
for result in results: |
|
if result is None or result == {}: |
|
continue |
|
|
|
|
|
face_coordinates = result["region"] |
|
x1, y1, x2, y2 = face_coordinates["x"], face_coordinates["y"], face_coordinates["x"] + face_coordinates[ |
|
"w"], face_coordinates["y"] + face_coordinates["h"] |
|
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) |
|
|
|
|
|
dominant_emotion = result["dominant_emotion"] |
|
cv2.putText(frame, dominant_emotion, (x1 + 5, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) |
|
|
|
|
|
if dominant_emotion in emotions: |
|
emotions[dominant_emotion] += 1 |
|
|
|
except ValueError as e: |
|
if "No face detected" in str(e): |
|
continue |
|
else: |
|
raise e |
|
|
|
|
|
frame_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
|
frame_images.append(frame_image) |
|
|
|
cap.release() |
|
cv2.destroyAllWindows() |
|
|
|
|
|
dominant_emotion = max(emotions, key=emotions.get) |
|
|
|
|
|
return dominant_emotion, frame_images |
|
|
|
|
|
|
|
classifier = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base", |
|
return_all_scores=True) |
|
|
|
|
|
def classify_tweets(tweets_df): |
|
if tweets_df.empty: |
|
return "No tweets to analyze." |
|
|
|
tweet_texts = tweets_df['Tweet'].tolist() |
|
results = classifier(tweet_texts) |
|
emotions = [max(result, key=lambda x: x['score'])['label'] for result in results] |
|
tweet_df = tweets_df.copy() |
|
tweet_df['Sentiment'] = emotions |
|
|
|
|
|
sentiment_counts = tweet_df['Sentiment'].value_counts() |
|
fig, ax = plt.subplots(figsize=(8, 5)) |
|
bars = ax.bar(sentiment_counts.index, sentiment_counts.values, |
|
color=['#FF6F61', '#6B5B95', '#88B04B', '#F7CAC9', '#92A8D1', '#955251']) |
|
|
|
ax.set_xlabel('Sentiment', fontsize=14, fontweight='bold', color='#34495E') |
|
ax.set_ylabel('Count', fontsize=14, fontweight='bold', color='#34495E') |
|
ax.set_title('Tweet Sentiment Distribution', fontsize=18, fontweight='bold', color='#2E4053') |
|
ax.tick_params(axis='x', rotation=0, colors='#34495E', labelsize=12) |
|
ax.tick_params(axis='y', colors='#34495E', labelsize=12) |
|
ax.spines['top'].set_visible(False) |
|
ax.spines['right'].set_visible(False) |
|
ax.spines['left'].set_visible(False) |
|
ax.spines['bottom'].set_visible(False) |
|
ax.yaxis.grid(True, linestyle='--', which='major', color='grey', alpha=.45) |
|
ax.xaxis.set_tick_params(width=0) |
|
|
|
for bar in bars: |
|
yval = bar.get_height() |
|
ax.text(bar.get_x() + bar.get_width() / 2, yval + 0.01, round(yval, 2), ha='center', va='bottom', |
|
color='#34495E', fontsize=12, fontweight='bold') |
|
|
|
plt.tight_layout() |
|
return tweet_df, fig |
|
|
|
|
|
def classify_and_plot(hashtag, mode, num_of_tweets, since_date, until_date): |
|
tweet_df = scrape_tweets(hashtag, mode, num_of_tweets, since_date, until_date) |
|
tweet_df, fig = classify_tweets(tweet_df) |
|
return tweet_df, fig |
|
|
|
|
|
|
|
def classify_overall_sentiment(video, hashtag, mode, num_of_tweets, since_date, until_date, aqi, noise, temp, |
|
humidity, pm25, co2): |
|
|
|
video_sentiment, frame_images = analyze_video_sentiment(video) |
|
|
|
|
|
tweet_df, plot = classify_and_plot(hashtag, mode, num_of_tweets, since_date, until_date) |
|
text_emotion = tweet_df['Sentiment'].value_counts().idxmax() if not tweet_df.empty else "No text analyzed" |
|
|
|
|
|
environment = classify_environment(aqi, noise, temp, humidity, pm25, co2) |
|
|
|
|
|
overall_sentiment = f"Video Sentiment: {video_sentiment}, Environment Sentiment: {environment}, Text Emotion: {text_emotion}" |
|
|
|
sentiments = ["Video Sentiment", "Text Emotion", "Environment Sentiment"] |
|
scores = [video_sentiment_score(video_sentiment), text_emotion_score(text_emotion), environment_score(environment)] |
|
|
|
fig, ax = plt.subplots() |
|
ax.plot(sentiments, scores, marker='o') |
|
|
|
ax.set_xlabel('Sentiment Source', fontsize=14, fontweight='bold', color='#34495E') |
|
ax.set_ylabel('Sentiment Score', fontsize=14, fontweight='bold', color='#34495E') |
|
ax.set_title('Overall Sentiment Scores', fontsize=18, fontweight='bold', color='#2E4053') |
|
|
|
plt.tight_layout() |
|
|
|
return overall_sentiment, frame_images, tweet_df, plot, fig |
|
|
|
|
|
|
|
example_video = "TimesSquare.mp4" |
|
|
|
video_interface = gr.Interface( |
|
fn=analyze_video_sentiment, |
|
inputs=[ |
|
gr.Video(value=example_video), |
|
gr.Slider(minimum=1, maximum=20, step=1), |
|
gr.Radio(["retinaface", "mtcnn", "opencv", "ssd", "dlib", "mediapipe"], label="Detector Backend", value="retinaface") |
|
], |
|
outputs=["text", gr.Gallery(label="Analyzed Frames")], |
|
title="Video Sentiment Analysis", |
|
) |
|
|
|
|
|
text_interface = gr.Interface( |
|
fn=classify_and_plot, |
|
inputs=[gr.Textbox(label="Hashtag"), |
|
gr.Radio(["latest", "top"], label="Mode"), |
|
gr.Slider(1, 1000, step=1, label="Number of Tweets"), |
|
gr.Textbox(label="Since Date (YYYY-MM-DD)"), |
|
gr.Textbox(label="Until Date (YYYY-MM-DD)")], |
|
outputs=[gr.DataFrame(label="Scraped Tweets"), gr.Plot()], |
|
title="Social Media Sentiment Analysis" |
|
) |
|
|
|
environment_interface = gr.Interface( |
|
fn=classify_environment, |
|
inputs=[gr.Slider(minimum=0, maximum=500, step=1, label="AQI"), |
|
gr.Slider(minimum=0, maximum=110, step=1, label="Noise"), |
|
gr.Slider(minimum=-10, maximum=50, step=1, label="Temperature"), |
|
gr.Slider(minimum=0, maximum=100, step=1, label="Humidity"), |
|
gr.Slider(minimum=0, maximum=500, step=1, label="PM2.5"), |
|
gr.Slider(minimum=250, maximum=6000, step=1, label="CO2")], |
|
outputs="text", |
|
title="Environment Sentiment Analysis" |
|
) |
|
|
|
|
|
|
|
overall_interface = gr.Interface( |
|
fn=classify_overall_sentiment, |
|
inputs=[gr.Video(), |
|
gr.Textbox(label="Hashtag"), gr.Radio(["latest", "top"], label="Mode"), |
|
gr.Slider(1, 1000, step=1, label="Number of Tweets"), |
|
gr.Textbox(label="Since Date (YYYY-MM-DD)"), gr.Textbox(label="Until Date (YYYY-MM-DD)"), |
|
gr.Slider(minimum=0, maximum=500, step=1, label="AQI"), |
|
gr.Slider(minimum=0, maximum=110, step=1, label="Noise"), |
|
gr.Slider(minimum=-10, maximum=50, step=1, label="Temperature"), |
|
gr.Slider(minimum=0, maximum=100, step=1, label="Humidity"), |
|
gr.Slider(minimum=0, maximum=500, step=1, label="PM2.5"), |
|
gr.Slider(minimum=250, maximum=6000, step=1, label="CO2")], |
|
outputs=["text", gr.Gallery(), gr.DataFrame(), gr.Plot(), gr.Plot()], |
|
title="Overall Sentiment Analysis" |
|
) |
|
|
|
scraper_interface = gr.Interface( |
|
fn=scrape_tweets, |
|
inputs=[gr.Textbox(label="Hashtag"), |
|
gr.Radio(["latest", "top"], label="Mode"), |
|
gr.Slider(1, 1000, step=1, label="Number of Tweets"), |
|
gr.Textbox(label="Since Date (YYYY-MM-DD)"), |
|
gr.Textbox(label="Until Date (YYYY-MM-DD)")], |
|
outputs=gr.DataFrame(), |
|
title="Scrape Tweets" |
|
) |
|
|
|
|
|
tabbed_interface = gr.TabbedInterface( |
|
[video_interface, text_interface, environment_interface, overall_interface, scraper_interface], |
|
["Video Sentiment Analysis", "Social Media Sentiment Analysis", "Environment Sentiment Analysis", |
|
"Overall Sentiment Analysis", "Scrape Tweets"]) |
|
|
|
|
|
tabbed_interface.launch(debug=True, share=True) |
|
|