|
import numpy as np
|
|
import pandas as pd
|
|
from sklearn.model_selection import train_test_split
|
|
from sklearn.ensemble import RandomForestClassifier
|
|
from sklearn.metrics import accuracy_score
|
|
import joblib
|
|
import cv2
|
|
from deepface import DeepFace
|
|
import random
|
|
import gradio as gr
|
|
import matplotlib.pyplot as plt
|
|
from transformers import pipeline
|
|
from PIL import Image
|
|
from ntscraper import Nitter
|
|
import csv
|
|
|
|
|
|
|
|
def scrape_tweets(hashtag, mode, num_of_tweets, since_date, until_date):
|
|
import httpx
|
|
httpx._config.DEFAULT_TIMEOUT = httpx.Timeout(3.0)
|
|
|
|
scraper = Nitter()
|
|
tweets = scraper.get_tweets(
|
|
hashtag,
|
|
mode='hashtag',
|
|
number=num_of_tweets,
|
|
since=since_date,
|
|
until=until_date
|
|
)
|
|
|
|
final_tweets = []
|
|
|
|
with open('tweets_kuru.csv', 'w', encoding='utf-8') as file:
|
|
writer = csv.writer(file)
|
|
writer.writerow([f'Scraping Tweets for #{hashtag}'])
|
|
writer.writerow(['User', 'Username', 'Tweet', 'Date'])
|
|
|
|
for tweet in tweets['tweets']:
|
|
tweet_details = [tweet['user']['name'], tweet['user']['username'], tweet['text'], tweet['date']]
|
|
writer.writerow([tweet['user']['name'], tweet['user']['username'], tweet['text'], tweet['date']])
|
|
final_tweets.append(tweet_details)
|
|
|
|
tweet_df = pd.DataFrame(final_tweets, columns=['User', 'Username', 'Tweet', 'Date'])
|
|
return tweet_df
|
|
|
|
|
|
|
|
np.random.seed(42)
|
|
data_size = 1000
|
|
aqi_values = np.random.randint(0, 500, size=data_size)
|
|
noise_levels = np.random.randint(30, 110, size=data_size)
|
|
temperatures = np.random.randint(-10, 40, size=data_size)
|
|
humidity_levels = np.random.randint(10, 90, size=data_size)
|
|
pm25_values = np.random.randint(0, 500, size=data_size)
|
|
co2_levels = np.random.randint(250, 6000, size=data_size)
|
|
|
|
|
|
def classify_environment(aqi, noise, temp, humidity, pm25, co2):
|
|
if aqi > 150 or noise > 80 or temp > 35 or humidity > 80 or pm25 > 55 or co2 > 2000:
|
|
return "Bad"
|
|
elif aqi > 100 or noise > 60 or temp > 30 or humidity > 60 or pm25 > 35 or co2 > 1000:
|
|
return "Moderate"
|
|
else:
|
|
return "Good"
|
|
|
|
|
|
labels = [classify_environment(aqi, noise, temp, humidity, pm25, co2)
|
|
for aqi, noise, temp, humidity, pm25, co2 in
|
|
zip(aqi_values, noise_levels, temperatures, humidity_levels, pm25_values, co2_levels)]
|
|
|
|
data = pd.DataFrame({
|
|
'AQI': aqi_values,
|
|
'Noise': noise_levels,
|
|
'Temperature': temperatures,
|
|
'Humidity': humidity_levels,
|
|
'PM2.5': pm25_values,
|
|
'CO2': co2_levels,
|
|
'Label': labels
|
|
})
|
|
|
|
|
|
X = data[['AQI', 'Noise', 'Temperature', 'Humidity', 'PM2.5', 'CO2']]
|
|
y = data['Label']
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
|
|
|
|
model = RandomForestClassifier()
|
|
model.fit(X_train, y_train)
|
|
|
|
predictions = model.predict(X_test)
|
|
print("Accuracy:", accuracy_score(y_test, predictions))
|
|
|
|
|
|
joblib.dump(model, 'environment_model.pkl')
|
|
|
|
|
|
|
|
def analyze_video_sentiment(video_path, num_frames=10, detector_backend='retinaface'):
|
|
cap = cv2.VideoCapture(video_path)
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
|
|
|
|
frame_indices = random.sample(range(total_frames), num_frames)
|
|
emotions = {"happy": 0, "sad": 0, "angry": 0, "surprised": 0, "neutral": 0}
|
|
frame_images = []
|
|
|
|
for idx in frame_indices:
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
continue
|
|
|
|
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
|
|
|
|
try:
|
|
results = DeepFace.analyze(rgb_frame, actions=["emotion"], enforce_detection=True,
|
|
detector_backend=detector_backend)
|
|
|
|
for result in results:
|
|
if result is None or result == {}:
|
|
continue
|
|
|
|
|
|
face_coordinates = result["region"]
|
|
x1, y1, x2, y2 = face_coordinates["x"], face_coordinates["y"], face_coordinates["x"] + face_coordinates[
|
|
"w"], face_coordinates["y"] + face_coordinates["h"]
|
|
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
|
|
|
|
|
dominant_emotion = result["dominant_emotion"]
|
|
cv2.putText(frame, dominant_emotion, (x1 + 5, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
|
|
|
|
|
|
if dominant_emotion in emotions:
|
|
emotions[dominant_emotion] += 1
|
|
|
|
except ValueError as e:
|
|
if "No face detected" in str(e):
|
|
continue
|
|
else:
|
|
raise e
|
|
|
|
|
|
frame_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
|
|
frame_images.append(frame_image)
|
|
|
|
cap.release()
|
|
cv2.destroyAllWindows()
|
|
|
|
|
|
dominant_emotion = max(emotions, key=emotions.get)
|
|
|
|
|
|
return dominant_emotion, frame_images
|
|
|
|
|
|
|
|
classifier = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base",
|
|
return_all_scores=True)
|
|
|
|
|
|
def classify_tweets(tweets_df):
|
|
if tweets_df.empty:
|
|
return "No tweets to analyze."
|
|
|
|
tweet_texts = tweets_df['Tweet'].tolist()
|
|
results = classifier(tweet_texts)
|
|
emotions = [max(result, key=lambda x: x['score'])['label'] for result in results]
|
|
tweet_df = tweets_df.copy()
|
|
tweet_df['Sentiment'] = emotions
|
|
|
|
|
|
sentiment_counts = tweet_df['Sentiment'].value_counts()
|
|
fig, ax = plt.subplots(figsize=(8, 5))
|
|
bars = ax.bar(sentiment_counts.index, sentiment_counts.values,
|
|
color=['#FF6F61', '#6B5B95', '#88B04B', '#F7CAC9', '#92A8D1', '#955251'])
|
|
|
|
ax.set_xlabel('Sentiment', fontsize=14, fontweight='bold', color='#34495E')
|
|
ax.set_ylabel('Count', fontsize=14, fontweight='bold', color='#34495E')
|
|
ax.set_title('Tweet Sentiment Distribution', fontsize=18, fontweight='bold', color='#2E4053')
|
|
ax.tick_params(axis='x', rotation=0, colors='#34495E', labelsize=12)
|
|
ax.tick_params(axis='y', colors='#34495E', labelsize=12)
|
|
ax.spines['top'].set_visible(False)
|
|
ax.spines['right'].set_visible(False)
|
|
ax.spines['left'].set_visible(False)
|
|
ax.spines['bottom'].set_visible(False)
|
|
ax.yaxis.grid(True, linestyle='--', which='major', color='grey', alpha=.45)
|
|
ax.xaxis.set_tick_params(width=0)
|
|
|
|
for bar in bars:
|
|
yval = bar.get_height()
|
|
ax.text(bar.get_x() + bar.get_width() / 2, yval + 0.01, round(yval, 2), ha='center', va='bottom',
|
|
color='#34495E', fontsize=12, fontweight='bold')
|
|
|
|
plt.tight_layout()
|
|
return tweet_df, fig
|
|
|
|
|
|
def classify_and_plot(hashtag, mode, num_of_tweets, since_date, until_date):
|
|
tweet_df = scrape_tweets(hashtag, mode, num_of_tweets, since_date, until_date)
|
|
tweet_df, fig = classify_tweets(tweet_df)
|
|
return tweet_df, fig
|
|
|
|
|
|
|
|
def classify_overall_sentiment(video, text, hashtag, mode, num_of_tweets, since_date, until_date, aqi, noise, temp,
|
|
humidity, pm25, co2):
|
|
|
|
video_sentiment = "No video analyzed"
|
|
frame_images = []
|
|
text_emotion = "No text analyzed"
|
|
environment = "No environment data"
|
|
tweet_df = pd.DataFrame()
|
|
plot = None
|
|
|
|
|
|
if video:
|
|
video_sentiment, frame_images = analyze_video_sentiment(video)
|
|
|
|
|
|
if hashtag and since_date and until_date:
|
|
try:
|
|
tweet_df, plot = classify_and_plot(hashtag, mode, num_of_tweets, since_date, until_date)
|
|
if not tweet_df.empty:
|
|
text_emotion = tweet_df['Sentiment'].value_counts().idxmax()
|
|
except Exception as e:
|
|
print(f"Error in social media analysis: {e}")
|
|
text_emotion = "Error in social media analysis"
|
|
|
|
|
|
if aqi is not None and noise is not None and temp is not None and humidity is not None and pm25 is not None and co2 is not None:
|
|
environment = classify_environment(aqi, noise, temp, humidity, pm25, co2)
|
|
|
|
|
|
overall_sentiment = f"Video Sentiment: {video_sentiment}, Environment Sentiment: {environment}"
|
|
|
|
if text_emotion != "No text analyzed" and text_emotion != "Error in social media analysis":
|
|
overall_sentiment += f", Text Emotion: {text_emotion}"
|
|
|
|
sentiments = ["Video Sentiment", "Text Emotion", "Environment Sentiment"]
|
|
scores = [video_sentiment_score(video_sentiment), text_emotion_score(text_emotion), environment_score(environment)]
|
|
|
|
fig, ax = plt.subplots()
|
|
ax.plot(sentiments, scores, marker='o')
|
|
|
|
ax.set_xlabel('Sentiment Source', fontsize=14, fontweight='bold', color='#34495E')
|
|
ax.set_ylabel('Sentiment Score', fontsize=14, fontweight='bold', color='#34495E')
|
|
ax.set_title('Overall Sentiment Scores', fontsize=18, fontweight='bold', color='#2E4053')
|
|
|
|
plt.tight_layout()
|
|
|
|
return overall_sentiment, frame_images, tweet_df, plot, fig
|
|
|
|
|
|
|
|
video_interface = gr.Interface(
|
|
fn=analyze_video_sentiment,
|
|
inputs=[gr.Video(), gr.Slider(minimum=1, maximum=20, step=1),
|
|
gr.Radio(["retinaface", "mtcnn", "opencv", "ssd", "dlib", "mediapipe"], label="Detector Backend",
|
|
value="retinaface")],
|
|
outputs=["text", gr.Gallery(label="Analyzed Frames")],
|
|
title="Video Sentiment Analysis"
|
|
)
|
|
|
|
text_interface = gr.Interface(
|
|
fn=classify_and_plot,
|
|
inputs=[gr.Textbox(label="Hashtag"),
|
|
gr.Radio(["latest", "top"], label="Mode"),
|
|
gr.Slider(1, 1000, step=1, label="Number of Tweets"),
|
|
gr.Textbox(label="Since Date (YYYY-MM-DD)"),
|
|
gr.Textbox(label="Until Date (YYYY-MM-DD)")],
|
|
outputs=[gr.DataFrame(label="Scraped Tweets"), gr.Plot()],
|
|
title="Social Media Sentiment Analysis"
|
|
)
|
|
|
|
environment_interface = gr.Interface(
|
|
fn=classify_environment,
|
|
inputs=[gr.Slider(minimum=0, maximum=500, step=1, label="AQI"),
|
|
gr.Slider(minimum=0, maximum=110, step=1, label="Noise"),
|
|
gr.Slider(minimum=-10, maximum=50, step=1, label="Temperature"),
|
|
gr.Slider(minimum=0, maximum=100, step=1, label="Humidity"),
|
|
gr.Slider(minimum=0, maximum=500, step=1, label="PM2.5"),
|
|
gr.Slider(minimum=250, maximum=6000, step=1, label="CO2")],
|
|
outputs="text",
|
|
title="Environment Sentiment Analysis"
|
|
)
|
|
|
|
|
|
|
|
overall_interface = gr.Interface(
|
|
fn=classify_overall_sentiment,
|
|
inputs=[gr.Video(),
|
|
gr.Textbox(label="Hashtag"), gr.Radio(["latest", "top"], label="Mode"),
|
|
gr.Slider(1, 1000, step=1, label="Number of Tweets"),
|
|
gr.Textbox(label="Since Date (YYYY-MM-DD)"), gr.Textbox(label="Until Date (YYYY-MM-DD)"),
|
|
gr.Slider(minimum=0, maximum=500, step=1, label="AQI"),
|
|
gr.Slider(minimum=0, maximum=110, step=1, label="Noise"),
|
|
gr.Slider(minimum=-10, maximum=50, step=1, label="Temperature"),
|
|
gr.Slider(minimum=0, maximum=100, step=1, label="Humidity"),
|
|
gr.Slider(minimum=0, maximum=500, step=1, label="PM2.5"),
|
|
gr.Slider(minimum=250, maximum=6000, step=1, label="CO2")],
|
|
outputs=["text", gr.Gallery(), gr.DataFrame(), gr.Plot(), gr.Plot()],
|
|
title="Overall Sentiment Analysis"
|
|
)
|
|
|
|
scraper_interface = gr.Interface(
|
|
fn=scrape_tweets,
|
|
inputs=[gr.Textbox(label="Hashtag"),
|
|
gr.Radio(["latest", "top"], label="Mode"),
|
|
gr.Slider(1, 1000, step=1, label="Number of Tweets"),
|
|
gr.Textbox(label="Since Date (YYYY-MM-DD)"),
|
|
gr.Textbox(label="Until Date (YYYY-MM-DD)")],
|
|
outputs=gr.DataFrame(),
|
|
title="Scrape Tweets"
|
|
)
|
|
|
|
|
|
tabbed_interface = gr.TabbedInterface(
|
|
[video_interface, text_interface, environment_interface, overall_interface, scraper_interface],
|
|
["Video Sentiment Analysis", "Social Media Sentiment Analysis", "Environment Sentiment Analysis",
|
|
"Overall Sentiment Analysis", "Scrape Tweets"])
|
|
|
|
|
|
tabbed_interface.launch(debug=True, share=True)
|
|
|