batlahiya's picture
Update t1.py
bdad9ba verified
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
import joblib
import cv2
from deepface import DeepFace
import random
import gradio as gr
import matplotlib.pyplot as plt
from transformers import pipeline
from PIL import Image
from ntscraper import Nitter
import csv
def video_sentiment_score(sentiment):
sentiment_mapping = {
"happy": 1.0,
"sad": -1.0,
"angry": -1.5,
"surprised": 0.5,
"neutral": 0.0
}
return sentiment_mapping.get(sentiment, 0.0)
def text_emotion_score(emotion):
emotion_mapping = {
"joy": 1.0,
"sadness": -1.0,
"anger": -1.5,
"surprise": 0.5,
"neutral": 0.0,
"disgust": -1.5,
"fear": -1.0
}
return emotion_mapping.get(emotion, 0.0)
def environment_score(environment):
environment_mapping = {
"Good": 1.0,
"Moderate": 0.0,
"Bad": -1.0
}
return environment_mapping.get(environment, 0.0)
# Scraper function
def scrape_tweets(hashtag, mode, num_of_tweets, since_date, until_date):
print(f"num_of_tweets before conversion: {num_of_tweets}")
num_of_tweets = int(num_of_tweets)
import httpx
httpx._config.DEFAULT_TIMEOUT = httpx.Timeout(3.0)
scraper = Nitter()
tweets = scraper.get_tweets(
hashtag,
mode='hashtag',
number=num_of_tweets,
since=since_date,
until=until_date
)
final_tweets = []
with open('tweets_kuru.csv', 'w', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow([f'Scraping Tweets for #{hashtag}'])
writer.writerow(['User', 'Username', 'Tweet', 'Date'])
for tweet in tweets['tweets']:
tweet_details = [tweet['user']['name'], tweet['user']['username'], tweet['text'], tweet['date']]
writer.writerow([tweet['user']['name'], tweet['user']['username'], tweet['text'], tweet['date']])
final_tweets.append(tweet_details)
tweet_df = pd.DataFrame(final_tweets, columns=['User', 'Username', 'Tweet', 'Date'])
return tweet_df
# Sensor Simulate Data
np.random.seed(42)
data_size = 1000
aqi_values = np.random.randint(0, 500, size=data_size)
noise_levels = np.random.randint(30, 110, size=data_size)
temperatures = np.random.randint(-10, 40, size=data_size)
humidity_levels = np.random.randint(10, 90, size=data_size)
pm25_values = np.random.randint(0, 500, size=data_size)
co2_levels = np.random.randint(250, 6000, size=data_size)
def classify_environment(aqi, noise, temp, humidity, pm25, co2):
if aqi > 150 or noise > 80 or temp > 35 or humidity > 80 or pm25 > 55 or co2 > 2000:
return "Bad"
elif aqi > 100 or noise > 60 or temp > 30 or humidity > 60 or pm25 > 35 or co2 > 1000:
return "Moderate"
else:
return "Good"
labels = [classify_environment(aqi, noise, temp, humidity, pm25, co2)
for aqi, noise, temp, humidity, pm25, co2 in
zip(aqi_values, noise_levels, temperatures, humidity_levels, pm25_values, co2_levels)]
data = pd.DataFrame({
'AQI': aqi_values,
'Noise': noise_levels,
'Temperature': temperatures,
'Humidity': humidity_levels,
'PM2.5': pm25_values,
'CO2': co2_levels,
'Label': labels
})
# Train a Simple Classification Model
X = data[['AQI', 'Noise', 'Temperature', 'Humidity', 'PM2.5', 'CO2']]
y = data['Label']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = RandomForestClassifier()
model.fit(X_train, y_train)
predictions = model.predict(X_test)
print("Accuracy:", accuracy_score(y_test, predictions))
# Save the model
joblib.dump(model, 'environment_model.pkl')
# Function to analyze video sentiment
def analyze_video_sentiment(video_path, num_frames=10, detector_backend='retinaface'):
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Select num_frames random frame indices
frame_indices = random.sample(range(total_frames), num_frames)
emotions = {"happy": 0, "sad": 0, "angry": 0, "surprised": 0, "neutral": 0}
frame_images = []
for idx in frame_indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if not ret:
continue
# Convert to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Face detection and emotion analysis
try:
results = DeepFace.analyze(rgb_frame, actions=["emotion"], enforce_detection=True,
detector_backend=detector_backend)
for result in results:
if result is None or result == {}:
continue
# Draw bounding box
face_coordinates = result["region"]
x1, y1, x2, y2 = face_coordinates["x"], face_coordinates["y"], face_coordinates["x"] + face_coordinates[
"w"], face_coordinates["y"] + face_coordinates["h"]
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Add emotion label above the bounding box
dominant_emotion = result["dominant_emotion"]
cv2.putText(frame, dominant_emotion, (x1 + 5, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
# Update emotion counts
if dominant_emotion in emotions:
emotions[dominant_emotion] += 1
except ValueError as e:
if "No face detected" in str(e):
continue
else:
raise e
# Convert frame to image for Gradio display
frame_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
frame_images.append(frame_image)
cap.release()
cv2.destroyAllWindows()
# Determine dominant emotion
dominant_emotion = max(emotions, key=emotions.get)
# Return dominant emotion and frame images
return dominant_emotion, frame_images
# Load the classifier for sentiment analysis
classifier = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base",
return_all_scores=True)
def classify_tweets(tweets_df):
if tweets_df.empty:
return "No tweets to analyze."
tweet_texts = tweets_df['Tweet'].tolist()
results = classifier(tweet_texts)
emotions = [max(result, key=lambda x: x['score'])['label'] for result in results]
tweet_df = tweets_df.copy()
tweet_df['Sentiment'] = emotions
# Plot sentiment distribution
sentiment_counts = tweet_df['Sentiment'].value_counts()
fig, ax = plt.subplots(figsize=(8, 5))
bars = ax.bar(sentiment_counts.index, sentiment_counts.values,
color=['#FF6F61', '#6B5B95', '#88B04B', '#F7CAC9', '#92A8D1', '#955251'])
ax.set_xlabel('Sentiment', fontsize=14, fontweight='bold', color='#34495E')
ax.set_ylabel('Count', fontsize=14, fontweight='bold', color='#34495E')
ax.set_title('Tweet Sentiment Distribution', fontsize=18, fontweight='bold', color='#2E4053')
ax.tick_params(axis='x', rotation=0, colors='#34495E', labelsize=12)
ax.tick_params(axis='y', colors='#34495E', labelsize=12)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.spines['bottom'].set_visible(False)
ax.yaxis.grid(True, linestyle='--', which='major', color='grey', alpha=.45)
ax.xaxis.set_tick_params(width=0)
for bar in bars:
yval = bar.get_height()
ax.text(bar.get_x() + bar.get_width() / 2, yval + 0.01, round(yval, 2), ha='center', va='bottom',
color='#34495E', fontsize=12, fontweight='bold')
plt.tight_layout()
return tweet_df, fig
def classify_and_plot(hashtag, mode, num_of_tweets, since_date, until_date):
tweet_df = scrape_tweets(hashtag, mode, num_of_tweets, since_date, until_date)
tweet_df, fig = classify_tweets(tweet_df)
return tweet_df, fig
# Function to classify overall sentiment
def classify_overall_sentiment(video, hashtag, mode, num_of_tweets, since_date, until_date, aqi, noise, temp,
humidity, pm25, co2):
# Video Sentiment Analysis
video_sentiment, frame_images = analyze_video_sentiment(video)
# Social Media Sentiment Analysis
tweet_df, plot = classify_and_plot(hashtag, mode, num_of_tweets, since_date, until_date)
text_emotion = tweet_df['Sentiment'].value_counts().idxmax() if not tweet_df.empty else "No text analyzed"
# Environment Sentiment Analysis
environment = classify_environment(aqi, noise, temp, humidity, pm25, co2)
# Calculate overall sentiment and create a plot
overall_sentiment = f"Video Sentiment: {video_sentiment}, Environment Sentiment: {environment}, Text Emotion: {text_emotion}"
sentiments = ["Video Sentiment", "Text Emotion", "Environment Sentiment"]
scores = [video_sentiment_score(video_sentiment), text_emotion_score(text_emotion), environment_score(environment)]
fig, ax = plt.subplots()
ax.plot(sentiments, scores, marker='o')
ax.set_xlabel('Sentiment Source', fontsize=14, fontweight='bold', color='#34495E')
ax.set_ylabel('Sentiment Score', fontsize=14, fontweight='bold', color='#34495E')
ax.set_title('Overall Sentiment Scores', fontsize=18, fontweight='bold', color='#2E4053')
plt.tight_layout()
return overall_sentiment, frame_images, tweet_df, plot, fig
# Create Gradio Interfaces
example_video = "TimesSquare.mp4" #hachaudhfikadjsfbhisuadbikahus
video_interface = gr.Interface(
fn=analyze_video_sentiment,
inputs=[
gr.Video(value=example_video), # Adding the example video here
gr.Slider(minimum=1, maximum=20, step=1),
gr.Radio(["retinaface", "mtcnn", "opencv", "ssd", "dlib", "mediapipe"], label="Detector Backend", value="retinaface")
],
outputs=["text", gr.Gallery(label="Analyzed Frames")],
title="Video Sentiment Analysis",
)
text_interface = gr.Interface(
fn=classify_and_plot,
inputs=[gr.Textbox(label="Hashtag"),
gr.Radio(["latest", "top"], label="Mode"),
gr.Slider(1, 1000, step=1, label="Number of Tweets"),
gr.Textbox(label="Since Date (YYYY-MM-DD)"),
gr.Textbox(label="Until Date (YYYY-MM-DD)")],
outputs=[gr.DataFrame(label="Scraped Tweets"), gr.Plot()],
title="Social Media Sentiment Analysis"
)
environment_interface = gr.Interface(
fn=classify_environment,
inputs=[gr.Slider(minimum=0, maximum=500, step=1, label="AQI"),
gr.Slider(minimum=0, maximum=110, step=1, label="Noise"),
gr.Slider(minimum=-10, maximum=50, step=1, label="Temperature"),
gr.Slider(minimum=0, maximum=100, step=1, label="Humidity"),
gr.Slider(minimum=0, maximum=500, step=1, label="PM2.5"),
gr.Slider(minimum=250, maximum=6000, step=1, label="CO2")],
outputs="text",
title="Environment Sentiment Analysis"
)
# The overall_interface
# Update the overall_interface
overall_interface = gr.Interface(
fn=classify_overall_sentiment,
inputs=[gr.Video(),
gr.Textbox(label="Hashtag"), gr.Radio(["latest", "top"], label="Mode"),
gr.Slider(1, 1000, step=1, label="Number of Tweets"),
gr.Textbox(label="Since Date (YYYY-MM-DD)"), gr.Textbox(label="Until Date (YYYY-MM-DD)"),
gr.Slider(minimum=0, maximum=500, step=1, label="AQI"),
gr.Slider(minimum=0, maximum=110, step=1, label="Noise"),
gr.Slider(minimum=-10, maximum=50, step=1, label="Temperature"),
gr.Slider(minimum=0, maximum=100, step=1, label="Humidity"),
gr.Slider(minimum=0, maximum=500, step=1, label="PM2.5"),
gr.Slider(minimum=250, maximum=6000, step=1, label="CO2")],
outputs=["text", gr.Gallery(), gr.DataFrame(), gr.Plot(), gr.Plot()],
title="Overall Sentiment Analysis"
)
scraper_interface = gr.Interface(
fn=scrape_tweets,
inputs=[gr.Textbox(label="Hashtag"),
gr.Radio(["latest", "top"], label="Mode"),
gr.Slider(1, 1000, step=1, label="Number of Tweets"),
gr.Textbox(label="Since Date (YYYY-MM-DD)"),
gr.Textbox(label="Until Date (YYYY-MM-DD)")],
outputs=gr.DataFrame(),
title="Scrape Tweets"
)
# Combine Interfaces into Tabbed Layout
tabbed_interface = gr.TabbedInterface(
[video_interface, text_interface, environment_interface, overall_interface, scraper_interface],
["Video Sentiment Analysis", "Social Media Sentiment Analysis", "Environment Sentiment Analysis",
"Overall Sentiment Analysis", "Scrape Tweets"])
# Launch the Interface
tabbed_interface.launch(debug=True, share=True)