Gggffx / app.py
Ffftdtd5dtft's picture
Create app.py
d3b8501 verified
import torch
from transformers import (
AutoTokenizer,
AutoModelForCausalLM,
AutoModelForSeq2SeqLM,
AutoProcessor,
AutoModelForSpeechSeq2Seq,
AutoModelForTextToWaveform
)
from diffusers import DiffusionPipeline
import time
import os
from dotenv import load_dotenv
from huggingface_hub import HfApi, HfFolder, Repository
import gradio as gr
load_dotenv()
def prune_model(model, amount=0.5):
from torch.nn.utils import prune
for name, module in model.named_modules():
if isinstance(module, (torch.nn.Linear, torch.nn.Conv2d)):
prune.l1_unstructured(module, name='weight', amount=amount)
prune.remove(module, 'weight')
return model
def quantize_to_q1_with_min(tensor, min_value=-1):
tensor = torch.sign(tensor)
tensor[tensor < min_value] = min_value
return tensor
def quantize_model_to_q1_with_min(model, min_value=-1):
for name, param in model.named_parameters():
if param.dtype in [torch.float32, torch.float16]:
with torch.no_grad():
param.copy_(quantize_to_q1_with_min(param.data, min_value))
def disable_unnecessary_components(model):
for name, module in model.named_modules():
if isinstance(module, torch.nn.Dropout):
module.p = 0.0
elif isinstance(module, torch.nn.BatchNorm1d):
module.eval()
def ultra_max_compress(model):
model = prune_model(model, amount=0.8)
quantize_model_to_q1_with_min(model, min_value=-0.05)
disable_unnecessary_components(model)
with torch.no_grad():
for name, param in model.named_parameters():
if param.requires_grad:
param.requires_grad = False
param.data = torch.nn.functional.hardtanh(param.data, min_val=-1.0, max_val=1.0)
param.data = param.data.half()
try:
model = torch.jit.script(model)
except Exception:
pass
prune_model(model, amount=0.9)
model.eval()
for buffer_name, buffer in model.named_buffers():
if buffer.numel() == 0:
model._buffers.pop(buffer_name)
return model
def optimize_model_resources(model):
torch.set_grad_enabled(False)
model.eval()
for name, param in model.named_parameters():
param.requires_grad = False
if param.dtype == torch.float32:
param.data = param.data.half()
if hasattr(model, 'config'):
if hasattr(model.config, 'max_position_embeddings'):
model.config.max_position_embeddings = min(model.config.max_position_embeddings, 512)
if hasattr(model.config, 'hidden_size'):
model.config.hidden_size = min(model.config.hidden_size, 768)
model = torch.jit.optimize_for_inference(model)
return model
def generate_random_responses(model, tokenizer, prompt, num_responses=5, max_length=50):
responses = []
for _ in range(num_responses):
input_ids = tokenizer.encode(prompt, return_tensors="pt")
output = model.generate(input_ids, max_length=max_length, do_sample=True, top_k=50)
response = tokenizer.decode(output[0], skip_special_tokens=True)
responses.append(response)
return responses
def patched_distilbert_forward(self, input_ids=None, attention_mask=None, head_mask=None, inputs_embeds=None, output_attentions=None, output_hidden_states=None, return_dict=None):
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = DistilBertModel.forward(self, input_ids, attention_mask=attention_mask, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict)
if not return_dict:
output_tuple = []
for v in [outputs.last_hidden_state, outputs.hidden_states, outputs.attentions]:
if v is not None:
output_tuple.append(v)
return tuple(output_tuple)
return outputs
def patched_forward(self, input_ids=None, attention_mask=None, head_mask=None, inputs_embeds=None, labels=None, output_attentions=None, output_hidden_states=None, return_dict=None):
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.distilbert(input_ids, attention_mask=attention_mask, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict)
hidden_state = outputs[0]
pooled_output = self.pre_classifier(hidden_state[:, 0])
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
if not return_dict:
output = (logits,) + outputs[1:]
return output
return logits
def patched_roberta_forward(self, input_ids=None, attention_mask=None, head_mask=None, inputs_embeds=None, labels=None, output_attentions=None, output_hidden_states=None, return_dict=None):
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.roberta(input_ids, attention_mask=attention_mask, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict)
hidden_state = outputs[0]
pooled_output = hidden_state[:, 0]
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
if not return_dict:
output = (logits,) + outputs[1:]
return output
return logits
def optimize_for_low_resources(model):
model = ultra_max_compress(model)
model = optimize_model_resources(model)
model.config.max_position_embeddings = 256
model.config.hidden_size = 384
return model
def optimize_for_very_low_resources(model):
model = ultra_max_compress(model)
model = optimize_model_resources(model)
model.config.max_position_embeddings = 128
model.config.hidden_size = 256
return model
def remove_unused_model_components(model):
for name, param in model.named_parameters():
if param.numel() == 0:
model._parameters.pop(name)
return model
def auto_train_model(model, train_data, epochs=3):
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
model.train()
for epoch in range(epochs):
for batch in train_data:
inputs, labels = batch
optimizer.zero_grad()
outputs = model(**inputs, labels=labels)
loss = outputs.loss
loss.backward()
optimizer.step()
return model
def apply_extreme_filters(model):
model = ultra_max_compress(model)
model = optimize_model_resources(model)
model.config.max_position_embeddings = 128
model.config.hidden_size = 256
model = torch.jit.optimize_for_inference(model)
model = prune_model(model, amount=0.95)
quantize_model_to_q1_with_min(model, min_value=-0.1)
return model
def reduce_latency(model, tokenizer, prompt, num_responses=5, max_length=50):
responses = []
start_time = time.time()
for _ in range(num_responses):
input_ids = tokenizer.encode(prompt, return_tensors="pt")
output = model.generate(input_ids, max_length=max_length, do_sample=True, top_k=50)
response = tokenizer.decode(output[0], skip_special_tokens=True)
responses.append(response)
end_time = time.time()
latency = (end_time - start_time) / num_responses * 1000
return responses, latency
def create_gpt_distill_model():
gpt_model = GPT2LMHeadModel.from_pretrained("gpt2")
gpt_tokenizer = AutoTokenizer.from_pretrained("gpt2")
return gpt_model, gpt_tokenizer
def create_gemma_distill_model():
gemma_model = AutoModelForCausalLM.from_pretrained("google/gemma-2-9b")
gemma_tokenizer = AutoTokenizer.from_pretrained("google/gemma-2-9b")
return gemma_model, gemma_tokenizer
def measure_performance(model, tokenizer, sequence_length=20, num_tokens=100):
inputs = tokenizer("A" * sequence_length, return_tensors="pt")
start_time = time.time()
for _ in range(num_tokens):
model.generate(**inputs)
end_time = time.time()
latency = (end_time - start_time) / num_tokens * 1000
tokens_per_second = num_tokens / (end_time - start_time)
return latency, tokens_per_second
def apply_diffusion_pipeline(prompt):
diffusion_pipeline = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-schnell")
images = diffusion_pipeline(prompt).images
return images
def generate_responses_with_diffusion(prompt, use_diffusion):
if "imagina" in prompt.lower() or "imagine" in prompt.lower():
images = apply_diffusion_pipeline(prompt)
return images
return None
def generate_summary_with_bart(prompt):
tokenizer = AutoTokenizer.from_pretrained("facebook/bart-large-cnn")
model = AutoModelForSeq2SeqLM.from_pretrained("facebook/bart-large-cnn")
inputs = tokenizer.encode(prompt, return_tensors="pt")
summary_ids = model.generate(inputs, max_length=130, min_length=30, length_penalty=2.0, num_beams=4, early_stopping=True)
summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
return summary
def generate_responses_with_bart(prompt):
if "resumir" in prompt.lower() or "resumime" in prompt.lower():
summary = generate_summary_with_bart(prompt)
return summary
return None
def apply_whisper_pipeline(prompt):
processor = AutoProcessor.from_pretrained("openai/whisper-small")
model = AutoModelForSpeechSeq2Seq.from_pretrained("openai/whisper-small")
inputs = processor(prompt, return_tensors="pt")
outputs = model.generate(**inputs)
transcription = processor.batch_decode(outputs, skip_special_tokens=True)
return transcription
def generate_transcription_with_whisper(prompt):
if "transcribe" in prompt.lower() or "transcribime" in prompt.lower():
transcription = apply_whisper_pipeline(prompt)
return transcription
return None
def apply_translation_pipeline(prompt):
tokenizer = AutoTokenizer.from_pretrained("google-t5/t5-base")
model = AutoModelForSeq2SeqLM.from_pretrained("google-t5/t5-base")
inputs = tokenizer.encode(prompt, return_tensors="pt")
translated_ids = model.generate(inputs, max_length=50)
translated_text = tokenizer.decode(translated_ids[0], skip_special_tokens=True)
return translated_text
def generate_translation_with_t5(prompt):
if "traducir" in prompt.lower() or "traducime" in prompt.lower():
translation = apply_translation_pipeline(prompt)
return translation
return None
def apply_musicgen_pipeline(prompt):
tokenizer = AutoTokenizer.from_pretrained("facebook/musicgen-small")
model = AutoModelForTextToWaveform.from_pretrained("facebook/musicgen-small")
inputs = tokenizer(prompt, return_tensors="pt")
audio = model.generate(inputs)
return audio
def generate_music_with_musicgen(prompt):
if "música" in prompt.lower() or "canción" in prompt.lower():
music = apply_musicgen_pipeline(prompt)
return music
return None
def apply_musicgen_melody_pipeline(prompt):
tokenizer = AutoTokenizer.from_pretrained("facebook/musicgen-melody")
model = AutoModelForTextToWaveform.from_pretrained("facebook/musicgen-melody")
inputs = tokenizer(prompt, return_tensors="pt")
audio = model.generate(inputs)
return audio
def generate_music_with_musicgen_melody(prompt):
if "melodía" in prompt.lower() or "melodia" in prompt.lower():
music = apply_musicgen_melody_pipeline(prompt)
return music
return None
def apply_stable_diffusion_pipeline(prompt):
pipeline = DiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-2-1")
images = pipeline(prompt).images
return images
def generate_responses_with_stable_diffusion(prompt):
if "imagen" in prompt.lower() or "image" in prompt.lower():
images = apply_stable_diffusion_pipeline(prompt)
return images
return None
def unify_models(*models):
combined_model = torch.nn.ModuleList(models)
return combined_model
def combined_filter(model):
model = ultra_max_compress(model)
model = optimize_model_resources(model)
model.config.max_position_embeddings = 128
model.config.hidden_size = 256
model = torch.jit.optimize_for_inference(model)
model = prune_model(model, amount=0.95)
quantize_model_to_q1_with_min(model, min_value=-0.1)
return model
def apply_filters_and_unify(model):
model = combined_filter(model)
model = remove_unused_model_components(model)
return model
def upload_to_huggingface(model, repo_name):
api = HfApi()
try:
api.create_repo(repo_id=repo_name, repo_type="model")
except Exception:
pass
model.save_pretrained(repo_name)
tokenizer.save_pretrained(repo_name)
repo = Repository(repo_name)
repo.push_to_hub()
def apply_extreme_filters_and_upload(model, repo_name):
model = apply_extreme_filters(model)
upload_to_huggingface(model, repo_name)
def start_gradio_interface():
def process_prompt(prompt):
response = {
"summary": generate_responses_with_bart(prompt),
"transcription": generate_transcription_with_whisper(prompt),
"translation": generate_translation_with_t5(prompt),
"music": generate_music_with_musicgen(prompt),
"melody_music": generate_music_with_musicgen_melody(prompt),
"image": generate_responses_with_stable_diffusion(prompt),
"diffusion": generate_responses_with_diffusion(prompt, True)
}
return response
interface = gr.Interface(
fn=process_prompt,
inputs=gr.Textbox(label="Enter Prompt"),
outputs=[gr.Textbox(label="Summary"), gr.Textbox(label="Transcription"), gr.Textbox(label="Translation"),
gr.Audio(label="Music"), gr.Audio(label="Melody Music"), gr.Image(label="Image"), gr.Image(label="Diffusion")],
title="Multi-Function AI Model",
description="Generate summaries, transcriptions, translations, music, melodies, images, and diffusion responses."
)
interface.launch()
start_gradio_interface()
model_infos = [
{"model_name": "gpt2", "class": GPT2LMHeadModel},
{"model_name": "google/gemma-2-9b", "class": AutoModelForCausalLM}
]
for model_info in model_infos:
model = model_info["class"].from_pretrained(model_info["model_name"])
tokenizer = AutoTokenizer.from_pretrained(model_info["model_name"])
optimized_model, responses, latency = optimize_model_with_all_optimizations(model, tokenizer, "Sample prompt for optimization.")
print(f"Model: {model_info['model_name']}")
print(f"Latency: {latency:.2f} ms")
print(f"Sample Responses: {responses}")
gpt_model, gpt_tokenizer = create_gpt_distill_model()
gemma_model, gemma_tokenizer = create_gemma_distill_model()
optimized_gpt_model, gpt_responses, gpt_latency = optimize_model_with_all_optimizations(gpt_model, gpt_tokenizer, "Sample prompt for GPT optimization.")
optimized_gemma_model, gemma_responses, gemma_latency = optimize_model_with_all_optimizations(gemma_model, gemma_tokenizer, "Sample prompt for Gemma optimization.")
combined_model = unify_models(optimized_gpt_model, optimized_gemma_model)
optimized_gpt_model_1gb = optimize_for_1gb_ram(optimized_gpt_model)
optimized_gemma_model_1gb = optimize_for_1gb_ram(optimized_gemma_model)
optimized_gpt_model_low = optimize_for_very_low_resources(optimized_gpt_model)
optimized_gemma_model_low = optimize_for_very_low_resources(optimized_gemma_model)
optimized_gpt_model_cpu = optimize_for_old_cpu(optimized_gpt_model)
optimized_gemma_model_cpu = optimize_for_old_cpu(optimized_gemma_model)
optimized_gpt_model_gpu = optimize_for_old_gpu(optimized_gpt_model)
optimized_gemma_model_gpu = optimize_for_old_gpu(optimized_gemma_model)
print("Models optimized for various resource constraints.")
diffusion_response = generate_responses_with_diffusion("Imagine a serene landscape", True)
if diffusion_response:
print("Diffusion response generated.")
summary_response = generate_responses_with_bart("Resumir este texto para obtener un resumen efectivo.", True)
if summary_response:
print("Summary response generated.")
transcription_response = generate_transcription_with_whisper("Transcribe this audio file.", True)
if transcription_response:
print("Transcription response generated.")
translation_response = generate_translation_with_t5("Traducir este texto al inglés.", True)
if translation_response:
print("Translation response generated.")
music_response = generate_music_with_musicgen("Música para una tarde tranquila.", True)
if music_response:
print("Music response generated.")
melody_music_response = generate_music_with_musicgen_melody("Melodía para relajación.", True)
if melody_music_response:
print("Melody music response generated.")
image_response = generate_responses_with_stable_diffusion("Imagen de un paisaje sereno.", True)
if image_response:
print("Image response generated.")
upload_to_huggingface(combined_model, "Ffftdtd5dtft/my_model")