|
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify |
|
from flask_sqlalchemy import SQLAlchemy |
|
from sqlalchemy import select, Column, exists, or_, Table, ForeignKey, Integer, String, DateTime, Text, and_, alias |
|
from sqlalchemy.orm import aliased, relationship |
|
from flask_login import UserMixin, LoginManager, login_user, login_required, logout_user, current_user |
|
from flask_wtf.csrf import generate_csrf |
|
from werkzeug.security import generate_password_hash, check_password_hash |
|
from werkzeug.utils import secure_filename |
|
from datetime import datetime |
|
from flask_migrate import Migrate |
|
import uuid |
|
import os |
|
from sqlalchemy import or_, func |
|
|
|
|
|
app = Flask("Simplz") |
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db' |
|
app.config['SECRET_KEY'] = 'your_secret_key' |
|
db = SQLAlchemy(app) |
|
migrate = Migrate(app, db) |
|
|
|
app.config['UPLOAD_FOLDER'] = 'static/users/uploaded_images' |
|
app.config['ALLOWED_EXTENSIONS'] = {'png', 'jpg', 'jpeg', 'gif'} |
|
|
|
followers = db.Table( |
|
'followers', |
|
db.Column('follower_id', db.Integer, db.ForeignKey('user.id'), primary_key=True), |
|
db.Column('followed_id', db.Integer, db.ForeignKey('user.id'), primary_key=True), |
|
extend_existing=True |
|
) |
|
|
|
class Comment(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
content = db.Column(db.Text, nullable=False) |
|
timestamp = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) |
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
|
post_id = db.Column(db.Integer, db.ForeignKey('post.id'), nullable=False) |
|
author = db.relationship('User', backref='comments', lazy=True) |
|
|
|
|
|
class Like(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
|
post_id = db.Column(db.Integer, db.ForeignKey('post.id', ondelete='CASCADE'), nullable=False) |
|
liked_at = db.Column(db.DateTime, default=datetime.utcnow) |
|
|
|
|
|
class Follow(db.Model): |
|
__tablename__ = 'followers' |
|
follower_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) |
|
followed_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) |
|
|
|
follower = db.relationship('User', foreign_keys=[follower_id], backref='following') |
|
followed = db.relationship('User', foreign_keys=[followed_id], backref='followers') |
|
|
|
__table_args__ = {'extend_existing': True} |
|
|
|
class Message(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
sender_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
|
recipient_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
|
content = db.Column(db.Text, nullable=False) |
|
timestamp = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) |
|
sender = db.relationship('User', foreign_keys=[sender_id], backref='sent_messages') |
|
recipient = db.relationship('User', foreign_keys=[recipient_id], backref='received_messages') |
|
|
|
|
|
|
|
class User(UserMixin, db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
username = db.Column(db.String(100), unique=True, nullable=False) |
|
email = db.Column(db.String(100), unique=True, nullable=False) |
|
password = db.Column(db.String(100), nullable=False) |
|
first_name = db.Column(db.String(50), default="Per") |
|
last_name = db.Column(db.String(50), default="Son") |
|
profession = db.Column(db.String(100), default="Person") |
|
location = db.Column(db.String(100), default="Earth") |
|
bio = db.Column(db.Text, default="Hello there...") |
|
profile_pic = db.Column(db.String(100), default='default.jpg') |
|
posts = db.relationship('Post', backref='author', lazy=True) |
|
|
|
following_list = db.relationship( |
|
'User', |
|
secondary=followers, |
|
primaryjoin=(followers.c.follower_id == id), |
|
secondaryjoin=(followers.c.followed_id == id), |
|
backref=db.backref('followers_of', lazy='dynamic'), |
|
lazy='dynamic', |
|
overlaps="follower,followers_of,following,following_list" |
|
) |
|
|
|
followers_list = db.relationship( |
|
'User', |
|
secondary=followers, |
|
primaryjoin=(followers.c.followed_id == id), |
|
secondaryjoin=(followers.c.follower_id == id), |
|
backref=db.backref('following_by', lazy='dynamic'), |
|
lazy='dynamic', |
|
overlaps="followed,followers,followers_of,following_list" |
|
) |
|
|
|
def is_following(self, user): |
|
return self.following_list.filter_by(id=user.id).first() is not None |
|
|
|
def follow(self, user): |
|
if not self.is_following(user) and self.id != user.id: |
|
self.following_list.append(user) |
|
db.session.commit() |
|
|
|
def unfollow(self, user): |
|
if self.is_following(user): |
|
self.following_list.remove(user) |
|
db.session.commit() |
|
|
|
def like_post(self, post): |
|
if not self.has_liked_post(post): |
|
like = Like(user_id=self.id, post_id=post.id) |
|
db.session.add(like) |
|
db.session.commit() |
|
|
|
def unlike_post(self, post): |
|
like = Like.query.filter_by(user_id=self.id, post_id=post.id).first() |
|
if like: |
|
db.session.delete(like) |
|
db.session.commit() |
|
|
|
def has_liked_post(self, post): |
|
return Like.query.filter_by(user_id=self.id, post_id=post.id).count() > 0 |
|
|
|
def add_comment(self, post, content): |
|
comment = Comment(user_id=self.id, post_id=post.id, content=content) |
|
db.session.add(comment) |
|
db.session.commit() |
|
|
|
def delete_comment(self, comment): |
|
if comment.user_id == self.id: |
|
db.session.delete(comment) |
|
db.session.commit() |
|
|
|
|
|
|
|
class Post(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
content = db.Column(db.Text, nullable=False) |
|
created_at = db.Column(db.DateTime, default=datetime.utcnow) |
|
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) |
|
filename = db.Column(db.String(100)) |
|
comments = db.relationship('Comment', backref='post', lazy=True, cascade='all, delete-orphan') |
|
likes = db.relationship('Like', backref='post', lazy=True) |
|
@property |
|
def likes_count(self): |
|
return Like.query.filter_by(post_id=self.id).count() |
|
|
|
|
|
login_manager = LoginManager(app) |
|
login_manager.login_view = 'login' |
|
|
|
def allowed_file(filename): |
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS'] |
|
|
|
@login_manager.user_loader |
|
def load_user(user_id): |
|
return db.session.get(User, int(user_id)) |
|
|
|
|
|
@app.route('/') |
|
@login_required |
|
def index(): |
|
followed_user = aliased(User, name='followed_user') |
|
|
|
own_posts_query = Post.query.filter_by(user_id=current_user.id) |
|
|
|
followed_posts_query = Post.query.join( |
|
Follow, |
|
or_( |
|
and_(Post.user_id == Follow.followed_id, Follow.follower_id == current_user.id), |
|
and_(Post.user_id == Follow.follower_id, Follow.followed_id == current_user.id) |
|
) |
|
) |
|
|
|
combined_posts_query = own_posts_query.union_all(followed_posts_query) |
|
posts = combined_posts_query.order_by(Post.created_at.desc()).all() |
|
|
|
csrf_token = request.environ.get('HTTP_X_CSRFTOKEN') |
|
|
|
return render_template('index.html', posts=posts, csrf_token=csrf_token) |
|
|
|
|
|
|
|
@app.route('/register', methods=['GET', 'POST']) |
|
def register(): |
|
if request.method == 'POST': |
|
email = request.form['email'] |
|
username = request.form['username'] |
|
password = request.form['password'] |
|
password2 = request.form['password2'] |
|
|
|
if password != password2: |
|
flash('Passwords do not match. Please try again.', 'error') |
|
return render_template('register.html') |
|
|
|
existing_user_with_username = User.query.filter_by(username=username).first() |
|
existing_user_with_email = User.query.filter_by(email=email).first() |
|
|
|
if existing_user_with_username: |
|
flash('Username already exists. Please choose a different username.', 'error') |
|
return render_template('register.html') |
|
|
|
if existing_user_with_email: |
|
flash('Email address already registered. Please use a different email.', 'error') |
|
return render_template('register.html') |
|
|
|
hashed_password = generate_password_hash(password, method='scrypt') |
|
new_user = User(username=username, email=email, password=hashed_password) |
|
db.session.add(new_user) |
|
db.session.commit() |
|
|
|
flash('Account created successfully! Please log in.', 'success') |
|
return redirect(url_for('login')) |
|
|
|
return render_template('register.html') |
|
|
|
@app.route('/login', methods=['GET', 'POST']) |
|
def login(): |
|
if request.method == 'POST': |
|
username = request.form['username'] |
|
password = request.form['password'] |
|
user = User.query.filter_by(username=username).first() |
|
if user and check_password_hash(user.password, password): |
|
login_user(user) |
|
return redirect(url_for('index')) |
|
else: |
|
flash('Invalid username or password. Fields are case sensitive.', 'error') |
|
return render_template('login.html') |
|
|
|
@app.route('/logout') |
|
@login_required |
|
def logout(): |
|
logout_user() |
|
return redirect(url_for('index')) |
|
|
|
@app.route('/create_post', methods=['POST']) |
|
@login_required |
|
def create_post(): |
|
content = request.form['content'] |
|
image = request.files['image'] |
|
|
|
if image and allowed_file(image.filename): |
|
filename = str(uuid.uuid4()) + secure_filename(image.filename) |
|
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) |
|
else: |
|
filename = None |
|
|
|
new_post = Post(content=content, author=current_user, filename=filename) |
|
db.session.add(new_post) |
|
db.session.commit() |
|
return redirect(url_for('index')) |
|
|
|
@app.route('/delete_post/<int:post_id>', methods=['POST']) |
|
@login_required |
|
def delete_post(post_id): |
|
post = db.session.get(Post, post_id) |
|
if post and post.author == current_user: |
|
if post.filename: |
|
try: |
|
os.remove(os.path.join(app.config['UPLOAD_FOLDER'], post.filename)) |
|
except Exception as e: |
|
flash(f"Error deleting image file: {str(e)}", "error") |
|
|
|
|
|
Like.query.filter_by(post_id=post_id).delete() |
|
db.session.delete(post) |
|
db.session.commit() |
|
return redirect(url_for('index')) |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/follow/<int:user_id>', methods=['POST']) |
|
@login_required |
|
def follow(user_id): |
|
|
|
user_to_follow = User.query.get(user_id) |
|
if user_to_follow is None: |
|
flash('User not found.', 'error') |
|
return redirect(url_for('explore')) |
|
|
|
if current_user.is_following(user_to_follow): |
|
flash('You are already following this user.', 'info') |
|
return redirect(url_for('view_profile', user_id=user_id)) |
|
|
|
try: |
|
current_user.follow(user_to_follow) |
|
except Exception as e: |
|
flash('Failed to follow the user. Please try again.', 'error') |
|
print(f"Error: {str(e)}") |
|
return redirect(url_for('view_profile', user_id=user_id)) |
|
|
|
flash(f"You are now following {user_to_follow.username}.", 'success') |
|
return redirect(url_for('view_profile', user_id=user_id)) |
|
|
|
|
|
@app.route('/unfollow/<int:user_id>', methods=['POST']) |
|
@login_required |
|
def unfollow(user_id, page): |
|
user_to_unfollow = User.query.get(user_id) |
|
if user_to_unfollow is None: |
|
flash('User not found.', 'danger') |
|
return redirect(url_for('index')) |
|
|
|
if current_user.is_following(user_to_unfollow): |
|
current_user.unfollow(user_to_unfollow) |
|
flash('You have unfollowed {}.'.format(user_to_unfollow.username), 'success') |
|
else: |
|
flash('You are not following this user.', 'info') |
|
if page == 'profile': |
|
return redirect(url_for('view_profile', user_id=user_id)) |
|
elif page == 'search': |
|
return redirect(url_for('search_results', query=request.form['query'])) |
|
else: |
|
return redirect(url_for('view_profile', user_id=user_id)) |
|
|
|
|
|
@app.route('/account', methods=['GET', 'POST']) |
|
@login_required |
|
def account(): |
|
if request.method == 'POST': |
|
current_user.username = request.form['username'] |
|
current_user.email = request.form['email'] |
|
current_user.first_name = request.form['first_name'] |
|
current_user.last_name = request.form['last_name'] |
|
current_user.bio = request.form['bio'] |
|
current_user.location = request.form['location'] |
|
current_user.profession = request.form['profession'] |
|
|
|
profile_picture = request.files['profile_picture'] |
|
if profile_picture and allowed_file(profile_picture.filename): |
|
filename = str(uuid.uuid4()) + secure_filename(profile_picture.filename) |
|
profile_picture.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) |
|
|
|
if current_user.profile_pic is not None and current_user.profile_pic != 'default.jpg': |
|
old_profile_picture_path = os.path.join(app.config['UPLOAD_FOLDER'], current_user.profile_pic) |
|
os.remove(old_profile_picture_path) |
|
|
|
current_user.profile_pic = filename |
|
|
|
new_password = request.form['new_password'] |
|
if new_password: |
|
current_user.password = generate_password_hash(new_password, method='scrypt') |
|
|
|
db.session.commit() |
|
flash('Profile updated successfully!', 'success') |
|
return redirect(url_for('account')) |
|
|
|
return render_template('account.html') |
|
|
|
@app.route('/search_user', methods=['GET']) |
|
def search_user(): |
|
search_query = request.args.get('search_query', '') |
|
users = User.query.filter( |
|
|
|
or_( |
|
User.username.ilike(f'%{search_query}%'), |
|
User.first_name.ilike(f'%{search_query}%'), |
|
User.last_name.ilike(f'%{search_query}%'), |
|
(User.first_name + User.last_name).like(f'%{search_query}%') |
|
) |
|
).all() |
|
|
|
|
|
|
|
csrf_token = generate_csrf() |
|
return render_template('search_results.html', users=users, csrf_token=csrf_token, searchq=search_query ) |
|
|
|
@app.route('/user/<int:user_id>') |
|
@login_required |
|
def view_profile(user_id): |
|
user = User.query.get(user_id) |
|
if not user: |
|
flash('User not found.', 'error') |
|
return redirect(url_for('search_user')) |
|
|
|
posts = Post.query.filter_by(user_id=user.id).order_by(Post.created_at.desc()).all() |
|
csrf_token = generate_csrf() |
|
return render_template('user_profile.html', user=user, posts=posts, csrf_token=csrf_token, profile_user_id=user_id) |
|
|
|
@app.route('/add_comment/<int:post_id>', methods=['POST']) |
|
@login_required |
|
def add_comment(post_id): |
|
content = request.form['content'] |
|
post = Post.query.get(post_id) |
|
if not post: |
|
flash('Post not found.', 'error') |
|
return redirect(url_for('index')) |
|
current_user.add_comment(post, content) |
|
|
|
flash('Comment added successfully.', 'success') |
|
return redirect(url_for('index')) |
|
|
|
@login_required |
|
@app.route('/like_post/<int:post_id>', methods=['POST']) |
|
def like_post(post_id): |
|
post = Post.query.get(post_id) |
|
if not post: |
|
flash('Post not found.', 'error') |
|
return redirect(url_for('index')) |
|
|
|
if current_user.has_liked_post(post): |
|
current_user.unlike_post(post) |
|
db.session.commit() |
|
liked = False |
|
else: |
|
current_user.like_post(post) |
|
db.session.commit() |
|
liked = True |
|
|
|
likes_count = post.likes_count |
|
response_data = {'likes_count': likes_count, 'liked': liked} |
|
return jsonify(response_data) |
|
|
|
|
|
@app.route('/send_message', methods=['POST']) |
|
@login_required |
|
def send_message(): |
|
recipient_username = request.form['recipient_username'] |
|
message_content = request.form['message_content'] |
|
|
|
recipient = User.query.filter_by(username=recipient_username).first() |
|
if not recipient: |
|
flash('Recipient not found.', 'error') |
|
return redirect(url_for('conversations')) |
|
|
|
new_message = Message(sender_id=current_user.id, recipient_id=recipient.id, content=message_content) |
|
db.session.add(new_message) |
|
db.session.commit() |
|
|
|
flash('Message sent successfully!', 'success') |
|
return redirect(url_for('conversation', recipient_id=recipient.id)) |
|
|
|
|
|
@app.route('/conversations') |
|
@login_required |
|
def conversations(): |
|
conversations = db.session.query(User).join( |
|
Message, or_( |
|
and_(Message.sender_id == current_user.id, Message.recipient_id == User.id), |
|
and_(Message.sender_id == User.id, Message.recipient_id == current_user.id) |
|
) |
|
).distinct(User.id).all() |
|
|
|
return render_template('conversations.html', conversations=conversations) |
|
|
|
@app.route('/new_conversation', methods=['GET']) |
|
@login_required |
|
def new_conversation(): |
|
return render_template('new_conversation.html') |
|
|
|
|
|
|
|
@app.route('/conversation/<int:recipient_id>', methods=['GET', 'POST']) |
|
@login_required |
|
def conversation(recipient_id): |
|
recipient = User.query.get(recipient_id) |
|
if not recipient: |
|
flash('Recipient not found.', 'error') |
|
return redirect(url_for('conversations')) |
|
|
|
if request.method == 'POST': |
|
message_content = request.form['message_content'] |
|
|
|
new_message = Message(sender_id=current_user.id, recipient_id=recipient.id, content=message_content) |
|
db.session.add(new_message) |
|
db.session.commit() |
|
|
|
flash('Message sent successfully!', 'success') |
|
return redirect(url_for('conversation', recipient_id=recipient_id)) |
|
|
|
messages = Message.query.filter(or_( |
|
and_(Message.sender_id == current_user.id, Message.recipient_id == recipient_id), |
|
and_(Message.sender_id == recipient_id, Message.recipient_id == current_user.id) |
|
)).order_by(Message.timestamp).all() |
|
|
|
return render_template('conversation.html', recipient=recipient, messages=messages) |
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
with app.app_context(): |
|
db.create_all() |
|
app.run(debug=True, use_reloader=False, host="0.0.0.0", port=7860) |
|
|