Spaces:
Running
Running
import gradio as gr | |
import torch | |
import os | |
import shutil | |
import time | |
import logging | |
import gc | |
import threading | |
import json | |
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig | |
# Настройка логирования | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Проверка наличия ZERO GPU | |
ZERO_GPU_ENABLED = os.environ.get("HF_ZERO_GPU", "0") == "1" | |
logger.info(f"Zero GPU активирован: {ZERO_GPU_ENABLED}") | |
# Получаем API токен из переменных окружения | |
HF_TOKEN = os.environ.get("HF_TOKEN", None) | |
logger.info("API токен найден" if HF_TOKEN else "API токен не найден! Добавьте HF_TOKEN в секреты репозитория") | |
# Информация о системе и CUDA | |
logger.info("===== Запуск приложения =====") | |
logger.info(f"PyTorch: {torch.__version__}") | |
# Проверка CUDA | |
cuda_available = torch.cuda.is_available() | |
logger.info(f"CUDA доступен: {cuda_available}") | |
if cuda_available: | |
logger.info(f"Количество CUDA устройств: {torch.cuda.device_count()}") | |
for i in range(torch.cuda.device_count()): | |
logger.info(f"CUDA устройство {i}: {torch.cuda.get_device_name(i)}") | |
free_mem = torch.cuda.get_device_properties(i).total_memory - torch.cuda.memory_allocated(i) | |
logger.info(f"Устройство {i}: свободно {free_mem / 1024**3:.2f} ГБ") | |
else: | |
logger.info("CUDA недоступен, используется CPU") | |
# Настройка директорий | |
user_home = os.path.expanduser("~") | |
DISK_DIR = os.path.join(user_home, "app_data") | |
CACHE_DIR = os.path.join(DISK_DIR, "models_cache") | |
os.makedirs(CACHE_DIR, exist_ok=True) | |
os.environ["TRANSFORMERS_CACHE"] = CACHE_DIR | |
os.environ["HF_HOME"] = CACHE_DIR | |
logger.info(f"Используем директорию для кэша: {CACHE_DIR}") | |
# Модель | |
model_name = "unsloth/Phi-3.5-mini-instruct" | |
logger.info(f"Выбрана модель: {model_name}") | |
# Глобальные переменные | |
model = None | |
tokenizer = None | |
is_model_loaded = False | |
DEFAULT_EOS_TOKEN = "</s>" | |
# Класс для таймаута | |
class TimeoutManager: | |
def __init__(self, seconds): | |
self.seconds = seconds | |
self.timeout_occurred = False | |
self.timer = None | |
def start(self): | |
self.timeout_occurred = False | |
self.timer = threading.Timer(self.seconds, self._timeout) | |
self.timer.daemon = True | |
self.timer.start() | |
def _timeout(self): | |
self.timeout_occurred = True | |
def stop(self): | |
if self.timer: | |
self.timer.cancel() | |
def check_timeout(self): | |
if self.timeout_occurred: | |
raise TimeoutException("Timeout occurred") | |
class TimeoutException(Exception): | |
pass | |
# Очистка памяти | |
def clear_memory(): | |
if cuda_available: | |
torch.cuda.empty_cache() | |
gc.collect() | |
# Загрузка модели | |
def load_model(): | |
global model, tokenizer, is_model_loaded | |
try: | |
clear_memory() | |
logger.info("Загружаем токенизатор...") | |
tokenizer = AutoTokenizer.from_pretrained( | |
model_name, | |
token=HF_TOKEN, | |
cache_dir=CACHE_DIR, | |
local_files_only=False, | |
revision="main" | |
) | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
logger.info(f"Токенизатор загружен: vocab_size={tokenizer.vocab_size}") | |
logger.info("Загружаем конфигурацию модели...") | |
config = AutoConfig.from_pretrained(model_name, token=HF_TOKEN, cache_dir=CACHE_DIR) | |
logger.info(f"Конфигурация модели: {config}") | |
logger.info("Загружаем модель...") | |
model_kwargs = { | |
"cache_dir": CACHE_DIR, | |
"trust_remote_code": True, | |
"token": HF_TOKEN, | |
"config": config | |
} | |
if cuda_available: | |
logger.info("Загружаем модель в режиме GPU...") | |
model_kwargs.update({ | |
"torch_dtype": torch.float16, | |
"device_map": "auto", | |
"load_in_4bit": True # Оптимизация от unsloth | |
}) | |
else: | |
logger.info("Загружаем модель в режиме CPU...") | |
model_kwargs.update({ | |
"torch_dtype": torch.float32, | |
"load_in_4bit": False | |
}) | |
model = AutoModelForCausalLM.from_pretrained( | |
model_name, | |
**model_kwargs | |
) | |
if not cuda_available: | |
model = model.to("cpu") | |
device_info = next(model.parameters()).device | |
logger.info(f"Модель загружена на устройство: {device_info}") | |
is_model_loaded = True | |
return f"Модель загружена на {device_info}" | |
except Exception as e: | |
logger.error(f"Ошибка загрузки модели: {str(e)}") | |
is_model_loaded = False | |
return f"Ошибка загрузки модели: {str(e)}" | |
# Загружаем модель при запуске | |
start_time = time.time() | |
load_result = load_model() | |
logger.info(f"Загрузка заняла {time.time() - start_time:.2f} секунд. Результат: {load_result}") | |
# Шаблон для генерации | |
EOS_TOKEN = tokenizer.eos_token if tokenizer and tokenizer.eos_token else DEFAULT_EOS_TOKEN | |
qa_prompt = "<|user|>{}\n<|assistant|> {}" # Формат для Phi-3.5-mini-instruct | |
# Функция генерации ответа | |
def respond(message, history, system_message, max_tokens, temperature, top_p, generation_timeout): | |
global model, tokenizer, is_model_loaded | |
if not is_model_loaded or model is None or tokenizer is None: | |
return "Модель не загружена. Проверьте логи или добавьте HF_TOKEN." | |
clear_memory() | |
start_time = time.time() | |
# Форматирование истории | |
full_prompt = "" | |
if system_message: | |
full_prompt += qa_prompt.format(system_message, "") + "\n" | |
for user_msg, assistant_msg in history: | |
if user_msg and assistant_msg: | |
full_prompt += qa_prompt.format(user_msg, assistant_msg) + EOS_TOKEN + "\n" | |
full_prompt += qa_prompt.format(message, "") | |
logger.info(f"Генерируем ответ на: '{message[:50]}...'") | |
try: | |
timeout_mgr = TimeoutManager(generation_timeout) | |
timeout_mgr.start() | |
inputs = tokenizer(full_prompt, return_tensors="pt").to(model.device) | |
timeout_mgr.check_timeout() | |
gen_kwargs = { | |
"input_ids": inputs.input_ids, | |
"max_new_tokens": max_tokens, | |
"temperature": temperature, | |
"top_p": top_p, | |
"do_sample": True, | |
"pad_token_id": tokenizer.pad_token_id, | |
} | |
outputs = model.generate(**gen_kwargs) | |
timeout_mgr.stop() | |
response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
response_start = response.rfind("<|assistant|> ") + len("<|assistant|> ") | |
response = response[response_start:].strip() if response_start >= len("<|assistant|> ") else response.strip() | |
logger.info(f"Генерация заняла {time.time() - start_time:.2f} секунд") | |
return response | |
except TimeoutException: | |
return f"Таймаут генерации ({generation_timeout} секунд)." | |
except Exception as e: | |
logger.error(f"Ошибка генерации: {str(e)}") | |
return f"Ошибка: {str(e)}" | |
finally: | |
if 'timeout_mgr' in locals(): | |
timeout_mgr.stop() | |
# Интерфейс Gradio | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# НереальностьQA - Чат с экспертом по эзотерике") | |
if not HF_TOKEN: | |
gr.Markdown("⚠️ Добавьте HF_TOKEN в секреты репозитория!") | |
with gr.Row(): | |
with gr.Column(scale=4): | |
chatbot = gr.Chatbot(label="Диалог") | |
user_input = gr.Textbox(placeholder="Введите вопрос...", label="Ваш вопрос", lines=2) | |
with gr.Row(): | |
submit_btn = gr.Button("Отправить", variant="primary") | |
clear_btn = gr.Button("Очистить") | |
with gr.Column(scale=1): | |
with gr.Accordion("Настройки", open=False): | |
system_msg = gr.Textbox( | |
value="Твоя задача — дать точный ответ на вопрос пользователя.", | |
label="Системное сообщение", | |
lines=4 | |
) | |
max_tokens = gr.Slider(1, 1024, value=256, step=1, label="Макс. токенов") | |
temperature = gr.Slider(0.1, 1.2, value=0.7, step=0.1, label="Температура") | |
top_p = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="Top-p") | |
generation_timeout = gr.Slider(10, 300, value=60, step=10, label="Таймаут (с)") | |
with gr.Accordion("Информация", open=True): | |
system_info = { | |
"Модель": model_name, | |
"Режим": "GPU" if cuda_available else "CPU", | |
"Статус": "Успешно" if is_model_loaded else "Ошибка", | |
"API токен": "Настроен" if HF_TOKEN else "Отсутствует" | |
} | |
gr.Markdown("\n".join([f"* **{k}**: {v}" for k, v in system_info.items()])) | |
with gr.Accordion("Примеры вопросов", open=True): | |
gr.Examples( | |
examples=[ | |
"Что известно о мире отшедших душ?", | |
"Что такое энергетическая ось человека?", | |
"Роль энергии мысли в мире отшедших?" | |
], | |
inputs=user_input | |
) | |
def chat(message, history): | |
if not message: | |
return history, "" | |
bot_message = respond(message, history, system_msg.value, max_tokens.value, temperature.value, top_p.value, generation_timeout.value) | |
history.append((message, bot_message)) | |
return history, "" | |
submit_btn.click(chat, [user_input, chatbot], [chatbot, user_input]) | |
user_input.submit(chat, [user_input, chatbot], [chatbot, user_input]) | |
clear_btn.click(lambda: ([], ""), None, [chatbot, user_input]) | |
if __name__ == "__main__": | |
demo.launch() |