Spaces:
Sleeping
Sleeping
import torch | |
from transformers import ( | |
AutoTokenizer, | |
AutoModelForCausalLM, | |
AutoModelForSeq2SeqLM, | |
AutoProcessor, | |
AutoModelForSpeechSeq2Seq, | |
AutoModelForTextToWaveform | |
) | |
from diffusers import DiffusionPipeline | |
import time | |
import os | |
from dotenv import load_dotenv | |
from huggingface_hub import HfApi, HfFolder, Repository | |
import gradio as gr | |
load_dotenv() | |
def prune_model(model, amount=0.5): | |
from torch.nn.utils import prune | |
for name, module in model.named_modules(): | |
if isinstance(module, (torch.nn.Linear, torch.nn.Conv2d)): | |
prune.l1_unstructured(module, name='weight', amount=amount) | |
prune.remove(module, 'weight') | |
return model | |
def quantize_to_q1_with_min(tensor, min_value=-1): | |
tensor = torch.sign(tensor) | |
tensor[tensor < min_value] = min_value | |
return tensor | |
def quantize_model_to_q1_with_min(model, min_value=-1): | |
for name, param in model.named_parameters(): | |
if param.dtype in [torch.float32, torch.float16]: | |
with torch.no_grad(): | |
param.copy_(quantize_to_q1_with_min(param.data, min_value)) | |
def disable_unnecessary_components(model): | |
for name, module in model.named_modules(): | |
if isinstance(module, torch.nn.Dropout): | |
module.p = 0.0 | |
elif isinstance(module, torch.nn.BatchNorm1d): | |
module.eval() | |
def ultra_max_compress(model): | |
model = prune_model(model, amount=0.8) | |
quantize_model_to_q1_with_min(model, min_value=-0.05) | |
disable_unnecessary_components(model) | |
with torch.no_grad(): | |
for name, param in model.named_parameters(): | |
if param.requires_grad: | |
param.requires_grad = False | |
param.data = torch.nn.functional.hardtanh(param.data, min_val=-1.0, max_val=1.0) | |
param.data = param.data.half() | |
try: | |
model = torch.jit.script(model) | |
except Exception: | |
pass | |
prune_model(model, amount=0.9) | |
model.eval() | |
for buffer_name, buffer in model.named_buffers(): | |
if buffer.numel() == 0: | |
model._buffers.pop(buffer_name) | |
return model | |
def optimize_model_resources(model): | |
torch.set_grad_enabled(False) | |
model.eval() | |
for name, param in model.named_parameters(): | |
param.requires_grad = False | |
if param.dtype == torch.float32: | |
param.data = param.data.half() | |
if hasattr(model, 'config'): | |
if hasattr(model.config, 'max_position_embeddings'): | |
model.config.max_position_embeddings = min(model.config.max_position_embeddings, 512) | |
if hasattr(model.config, 'hidden_size'): | |
model.config.hidden_size = min(model.config.hidden_size, 768) | |
model = torch.jit.optimize_for_inference(model) | |
return model | |
def generate_random_responses(model, tokenizer, prompt, num_responses=5, max_length=50): | |
responses = [] | |
for _ in range(num_responses): | |
input_ids = tokenizer.encode(prompt, return_tensors="pt") | |
output = model.generate(input_ids, max_length=max_length, do_sample=True, top_k=50) | |
response = tokenizer.decode(output[0], skip_special_tokens=True) | |
responses.append(response) | |
return responses | |
def patched_distilbert_forward(self, input_ids=None, attention_mask=None, head_mask=None, inputs_embeds=None, output_attentions=None, output_hidden_states=None, return_dict=None): | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
outputs = DistilBertModel.forward(self, input_ids, attention_mask=attention_mask, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict) | |
if not return_dict: | |
output_tuple = [] | |
for v in [outputs.last_hidden_state, outputs.hidden_states, outputs.attentions]: | |
if v is not None: | |
output_tuple.append(v) | |
return tuple(output_tuple) | |
return outputs | |
def patched_forward(self, input_ids=None, attention_mask=None, head_mask=None, inputs_embeds=None, labels=None, output_attentions=None, output_hidden_states=None, return_dict=None): | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
outputs = self.distilbert(input_ids, attention_mask=attention_mask, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict) | |
hidden_state = outputs[0] | |
pooled_output = self.pre_classifier(hidden_state[:, 0]) | |
pooled_output = self.dropout(pooled_output) | |
logits = self.classifier(pooled_output) | |
if not return_dict: | |
output = (logits,) + outputs[1:] | |
return output | |
return logits | |
def patched_roberta_forward(self, input_ids=None, attention_mask=None, head_mask=None, inputs_embeds=None, labels=None, output_attentions=None, output_hidden_states=None, return_dict=None): | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
outputs = self.roberta(input_ids, attention_mask=attention_mask, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict) | |
hidden_state = outputs[0] | |
pooled_output = hidden_state[:, 0] | |
pooled_output = self.dropout(pooled_output) | |
logits = self.classifier(pooled_output) | |
if not return_dict: | |
output = (logits,) + outputs[1:] | |
return output | |
return logits | |
def optimize_for_low_resources(model): | |
model = ultra_max_compress(model) | |
model = optimize_model_resources(model) | |
model.config.max_position_embeddings = 256 | |
model.config.hidden_size = 384 | |
return model | |
def optimize_for_very_low_resources(model): | |
model = ultra_max_compress(model) | |
model = optimize_model_resources(model) | |
model.config.max_position_embeddings = 128 | |
model.config.hidden_size = 256 | |
return model | |
def remove_unused_model_components(model): | |
for name, param in model.named_parameters(): | |
if param.numel() == 0: | |
model._parameters.pop(name) | |
return model | |
def auto_train_model(model, train_data, epochs=3): | |
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5) | |
model.train() | |
for epoch in range(epochs): | |
for batch in train_data: | |
inputs, labels = batch | |
optimizer.zero_grad() | |
outputs = model(**inputs, labels=labels) | |
loss = outputs.loss | |
loss.backward() | |
optimizer.step() | |
return model | |
def apply_extreme_filters(model): | |
model = ultra_max_compress(model) | |
model = optimize_model_resources(model) | |
model.config.max_position_embeddings = 128 | |
model.config.hidden_size = 256 | |
model = torch.jit.optimize_for_inference(model) | |
model = prune_model(model, amount=0.95) | |
quantize_model_to_q1_with_min(model, min_value=-0.1) | |
return model | |
def reduce_latency(model, tokenizer, prompt, num_responses=5, max_length=50): | |
responses = [] | |
start_time = time.time() | |
for _ in range(num_responses): | |
input_ids = tokenizer.encode(prompt, return_tensors="pt") | |
output = model.generate(input_ids, max_length=max_length, do_sample=True, top_k=50) | |
response = tokenizer.decode(output[0], skip_special_tokens=True) | |
responses.append(response) | |
end_time = time.time() | |
latency = (end_time - start_time) / num_responses * 1000 | |
return responses, latency | |
def create_gpt_distill_model(): | |
gpt_model = GPT2LMHeadModel.from_pretrained("gpt2") | |
gpt_tokenizer = AutoTokenizer.from_pretrained("gpt2") | |
return gpt_model, gpt_tokenizer | |
def create_gemma_distill_model(): | |
gemma_model = AutoModelForCausalLM.from_pretrained("google/gemma-2-9b") | |
gemma_tokenizer = AutoTokenizer.from_pretrained("google/gemma-2-9b") | |
return gemma_model, gemma_tokenizer | |
def measure_performance(model, tokenizer, sequence_length=20, num_tokens=100): | |
inputs = tokenizer("A" * sequence_length, return_tensors="pt") | |
start_time = time.time() | |
for _ in range(num_tokens): | |
model.generate(**inputs) | |
end_time = time.time() | |
latency = (end_time - start_time) / num_tokens * 1000 | |
tokens_per_second = num_tokens / (end_time - start_time) | |
return latency, tokens_per_second | |
def apply_diffusion_pipeline(prompt): | |
diffusion_pipeline = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-schnell") | |
images = diffusion_pipeline(prompt).images | |
return images | |
def generate_responses_with_diffusion(prompt, use_diffusion): | |
if "imagina" in prompt.lower() or "imagine" in prompt.lower(): | |
images = apply_diffusion_pipeline(prompt) | |
return images | |
return None | |
def generate_summary_with_bart(prompt): | |
tokenizer = AutoTokenizer.from_pretrained("facebook/bart-large-cnn") | |
model = AutoModelForSeq2SeqLM.from_pretrained("facebook/bart-large-cnn") | |
inputs = tokenizer.encode(prompt, return_tensors="pt") | |
summary_ids = model.generate(inputs, max_length=130, min_length=30, length_penalty=2.0, num_beams=4, early_stopping=True) | |
summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
return summary | |
def generate_responses_with_bart(prompt): | |
if "resumir" in prompt.lower() or "resumime" in prompt.lower(): | |
summary = generate_summary_with_bart(prompt) | |
return summary | |
return None | |
def apply_whisper_pipeline(prompt): | |
processor = AutoProcessor.from_pretrained("openai/whisper-small") | |
model = AutoModelForSpeechSeq2Seq.from_pretrained("openai/whisper-small") | |
inputs = processor(prompt, return_tensors="pt") | |
outputs = model.generate(**inputs) | |
transcription = processor.batch_decode(outputs, skip_special_tokens=True) | |
return transcription | |
def generate_transcription_with_whisper(prompt): | |
if "transcribe" in prompt.lower() or "transcribime" in prompt.lower(): | |
transcription = apply_whisper_pipeline(prompt) | |
return transcription | |
return None | |
def apply_translation_pipeline(prompt): | |
tokenizer = AutoTokenizer.from_pretrained("google-t5/t5-base") | |
model = AutoModelForSeq2SeqLM.from_pretrained("google-t5/t5-base") | |
inputs = tokenizer.encode(prompt, return_tensors="pt") | |
translated_ids = model.generate(inputs, max_length=50) | |
translated_text = tokenizer.decode(translated_ids[0], skip_special_tokens=True) | |
return translated_text | |
def generate_translation_with_t5(prompt): | |
if "traducir" in prompt.lower() or "traducime" in prompt.lower(): | |
translation = apply_translation_pipeline(prompt) | |
return translation | |
return None | |
def apply_musicgen_pipeline(prompt): | |
tokenizer = AutoTokenizer.from_pretrained("facebook/musicgen-small") | |
model = AutoModelForTextToWaveform.from_pretrained("facebook/musicgen-small") | |
inputs = tokenizer(prompt, return_tensors="pt") | |
audio = model.generate(inputs) | |
return audio | |
def generate_music_with_musicgen(prompt): | |
if "música" in prompt.lower() or "canción" in prompt.lower(): | |
music = apply_musicgen_pipeline(prompt) | |
return music | |
return None | |
def apply_musicgen_melody_pipeline(prompt): | |
tokenizer = AutoTokenizer.from_pretrained("facebook/musicgen-melody") | |
model = AutoModelForTextToWaveform.from_pretrained("facebook/musicgen-melody") | |
inputs = tokenizer(prompt, return_tensors="pt") | |
audio = model.generate(inputs) | |
return audio | |
def generate_music_with_musicgen_melody(prompt): | |
if "melodía" in prompt.lower() or "melodia" in prompt.lower(): | |
music = apply_musicgen_melody_pipeline(prompt) | |
return music | |
return None | |
def apply_stable_diffusion_pipeline(prompt): | |
pipeline = DiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-2-1") | |
images = pipeline(prompt).images | |
return images | |
def generate_responses_with_stable_diffusion(prompt): | |
if "imagen" in prompt.lower() or "image" in prompt.lower(): | |
images = apply_stable_diffusion_pipeline(prompt) | |
return images | |
return None | |
def unify_models(*models): | |
combined_model = torch.nn.ModuleList(models) | |
return combined_model | |
def combined_filter(model): | |
model = ultra_max_compress(model) | |
model = optimize_model_resources(model) | |
model.config.max_position_embeddings = 128 | |
model.config.hidden_size = 256 | |
model = torch.jit.optimize_for_inference(model) | |
model = prune_model(model, amount=0.95) | |
quantize_model_to_q1_with_min(model, min_value=-0.1) | |
return model | |
def apply_filters_and_unify(model): | |
model = combined_filter(model) | |
model = remove_unused_model_components(model) | |
return model | |
def upload_to_huggingface(model, repo_name): | |
api = HfApi() | |
try: | |
api.create_repo(repo_id=repo_name, repo_type="model") | |
except Exception: | |
pass | |
model.save_pretrained(repo_name) | |
tokenizer.save_pretrained(repo_name) | |
repo = Repository(repo_name) | |
repo.push_to_hub() | |
def apply_extreme_filters_and_upload(model, repo_name): | |
model = apply_extreme_filters(model) | |
upload_to_huggingface(model, repo_name) | |
def start_gradio_interface(): | |
def process_prompt(prompt): | |
response = { | |
"summary": generate_responses_with_bart(prompt), | |
"transcription": generate_transcription_with_whisper(prompt), | |
"translation": generate_translation_with_t5(prompt), | |
"music": generate_music_with_musicgen(prompt), | |
"melody_music": generate_music_with_musicgen_melody(prompt), | |
"image": generate_responses_with_stable_diffusion(prompt), | |
"diffusion": generate_responses_with_diffusion(prompt, True) | |
} | |
return response | |
interface = gr.Interface( | |
fn=process_prompt, | |
inputs=gr.Textbox(label="Enter Prompt"), | |
outputs=[gr.Textbox(label="Summary"), gr.Textbox(label="Transcription"), gr.Textbox(label="Translation"), | |
gr.Audio(label="Music"), gr.Audio(label="Melody Music"), gr.Image(label="Image"), gr.Image(label="Diffusion")], | |
title="Multi-Function AI Model", | |
description="Generate summaries, transcriptions, translations, music, melodies, images, and diffusion responses." | |
) | |
interface.launch() | |
start_gradio_interface() | |
model_infos = [ | |
{"model_name": "gpt2", "class": GPT2LMHeadModel}, | |
{"model_name": "google/gemma-2-9b", "class": AutoModelForCausalLM} | |
] | |
for model_info in model_infos: | |
model = model_info["class"].from_pretrained(model_info["model_name"]) | |
tokenizer = AutoTokenizer.from_pretrained(model_info["model_name"]) | |
optimized_model, responses, latency = optimize_model_with_all_optimizations(model, tokenizer, "Sample prompt for optimization.") | |
print(f"Model: {model_info['model_name']}") | |
print(f"Latency: {latency:.2f} ms") | |
print(f"Sample Responses: {responses}") | |
gpt_model, gpt_tokenizer = create_gpt_distill_model() | |
gemma_model, gemma_tokenizer = create_gemma_distill_model() | |
optimized_gpt_model, gpt_responses, gpt_latency = optimize_model_with_all_optimizations(gpt_model, gpt_tokenizer, "Sample prompt for GPT optimization.") | |
optimized_gemma_model, gemma_responses, gemma_latency = optimize_model_with_all_optimizations(gemma_model, gemma_tokenizer, "Sample prompt for Gemma optimization.") | |
combined_model = unify_models(optimized_gpt_model, optimized_gemma_model) | |
optimized_gpt_model_1gb = optimize_for_1gb_ram(optimized_gpt_model) | |
optimized_gemma_model_1gb = optimize_for_1gb_ram(optimized_gemma_model) | |
optimized_gpt_model_low = optimize_for_very_low_resources(optimized_gpt_model) | |
optimized_gemma_model_low = optimize_for_very_low_resources(optimized_gemma_model) | |
optimized_gpt_model_cpu = optimize_for_old_cpu(optimized_gpt_model) | |
optimized_gemma_model_cpu = optimize_for_old_cpu(optimized_gemma_model) | |
optimized_gpt_model_gpu = optimize_for_old_gpu(optimized_gpt_model) | |
optimized_gemma_model_gpu = optimize_for_old_gpu(optimized_gemma_model) | |
print("Models optimized for various resource constraints.") | |
diffusion_response = generate_responses_with_diffusion("Imagine a serene landscape", True) | |
if diffusion_response: | |
print("Diffusion response generated.") | |
summary_response = generate_responses_with_bart("Resumir este texto para obtener un resumen efectivo.", True) | |
if summary_response: | |
print("Summary response generated.") | |
transcription_response = generate_transcription_with_whisper("Transcribe this audio file.", True) | |
if transcription_response: | |
print("Transcription response generated.") | |
translation_response = generate_translation_with_t5("Traducir este texto al inglés.", True) | |
if translation_response: | |
print("Translation response generated.") | |
music_response = generate_music_with_musicgen("Música para una tarde tranquila.", True) | |
if music_response: | |
print("Music response generated.") | |
melody_music_response = generate_music_with_musicgen_melody("Melodía para relajación.", True) | |
if melody_music_response: | |
print("Melody music response generated.") | |
image_response = generate_responses_with_stable_diffusion("Imagen de un paisaje sereno.", True) | |
if image_response: | |
print("Image response generated.") | |
upload_to_huggingface(combined_model, "Ffftdtd5dtft/my_model") | |