File size: 6,043 Bytes
4125bc1 808f31e ad35cf2 28038e5 808f31e 28038e5 808f31e 28038e5 808f31e 28038e5 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 28038e5 ad35cf2 28038e5 ad35cf2 28038e5 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 808f31e ad35cf2 808f31e 28038e5 808f31e 28038e5 808f31e ad35cf2 28038e5 808f31e ad35cf2 808f31e ad35cf2 808f31e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
##########modules/auth/auth.py
import os
import streamlit as st
from azure.cosmos import CosmosClient, exceptions
from azure.cosmos.exceptions import CosmosHttpResponseError
import bcrypt
import base64
from ..database.sql_db import (
get_user,
get_student_user,
get_admin_user,
create_student_user,
update_student_user,
delete_student_user,
record_login,
record_logout
)
import logging
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
def clean_and_validate_key(key):
"""Limpia y valida la clave de CosmosDB"""
key = key.strip()
while len(key) % 4 != 0:
key += '='
try:
base64.b64decode(key)
return key
except:
raise ValueError("La clave proporcionada no es v谩lida")
# Verificar las variables de entorno
endpoint = os.getenv("COSMOS_ENDPOINT")
key = os.getenv("COSMOS_KEY")
if not endpoint or not key:
raise ValueError("Las variables de entorno COSMOS_ENDPOINT y COSMOS_KEY deben estar configuradas")
key = clean_and_validate_key(key)
def authenticate_user(username, password):
"""Autentica un usuario y registra el inicio de sesi贸n"""
try:
user_item = get_user(username)
if not user_item:
logger.warning(f"Usuario no encontrado: {username}")
return False, None
if verify_password(user_item['password'], password):
logger.info(f"Usuario autenticado: {username}, Rol: {user_item['role']}")
try:
session_id = record_login(username)
if session_id:
st.session_state.session_id = session_id
st.session_state.username = username
st.session_state.login_time = datetime.now(timezone.utc).isoformat()
logger.info(f"Sesi贸n iniciada: {session_id}")
else:
logger.warning("No se pudo registrar la sesi贸n")
except Exception as e:
logger.error(f"Error al registrar inicio de sesi贸n: {str(e)}")
return True, user_item['role']
logger.warning(f"Contrase帽a incorrecta para usuario: {username}")
return False, None
except Exception as e:
logger.error(f"Error durante la autenticaci贸n del usuario: {str(e)}")
return False, None
def authenticate_student(username, password):
"""Autentica un estudiante"""
success, role = authenticate_user(username, password)
if success and role == 'Estudiante':
return True, role
return False, None
def authenticate_admin(username, password):
"""Autentica un administrador"""
success, role = authenticate_user(username, password)
if success and role == 'Administrador':
return True, role
return False, None
def register_student(username, password, additional_info=None):
"""Registra un nuevo estudiante"""
try:
if get_student_user(username):
logger.warning(f"Estudiante ya existe: {username}")
return False
hashed_password = hash_password(password)
# Asegurarse que additional_info tenga el rol correcto
if not additional_info:
additional_info = {}
additional_info['role'] = 'Estudiante'
success = create_student_user(username, hashed_password, additional_info)
if success:
logger.info(f"Nuevo estudiante registrado: {username}")
return True
logger.error(f"Error al crear estudiante: {username}")
return False
except Exception as e:
logger.error(f"Error al registrar estudiante: {str(e)}")
return False
def update_student_info(username, new_info):
"""Actualiza la informaci贸n de un estudiante"""
try:
if 'password' in new_info:
new_info['password'] = hash_password(new_info['password'])
success = update_student_user(username, new_info)
if success:
logger.info(f"Informaci贸n actualizada: {username}")
return True
logger.error(f"Error al actualizar: {username}")
return False
except Exception as e:
logger.error(f"Error en actualizaci贸n: {str(e)}")
return False
def delete_student(username):
"""Elimina un estudiante"""
try:
success = delete_student_user(username)
if success:
logger.info(f"Estudiante eliminado: {username}")
return True
logger.error(f"Error al eliminar: {username}")
return False
except Exception as e:
logger.error(f"Error en eliminaci贸n: {str(e)}")
return False
def logout():
"""Cierra la sesi贸n del usuario"""
try:
if 'session_id' in st.session_state and 'username' in st.session_state:
success = record_logout(
st.session_state.username,
st.session_state.session_id
)
if success:
logger.info(f"Sesi贸n cerrada: {st.session_state.username}")
else:
logger.warning(f"Error al registrar cierre de sesi贸n: {st.session_state.username}")
except Exception as e:
logger.error(f"Error en logout: {str(e)}")
finally:
st.session_state.clear()
def hash_password(password):
"""Hashea una contrase帽a"""
return bcrypt.hashpw(
password.encode('utf-8'),
bcrypt.gensalt()
).decode('utf-8')
def verify_password(stored_password, provided_password):
"""Verifica una contrase帽a"""
return bcrypt.checkpw(
provided_password.encode('utf-8'),
stored_password.encode('utf-8')
)
__all__ = [
'authenticate_user',
'authenticate_admin',
'authenticate_student',
'register_student',
'update_student_info',
'delete_student',
'logout',
'hash_password',
'verify_password'
] |