Spaces:
Runtime error
Runtime error
project_index = 0 # Initialize project_index with a default value | |
import gradio as gr,os,shutil,numpy as np,hashlib,subprocess,pandas as pd | |
from PIL import Image | |
from datetime import datetime | |
import base64 | |
import requests | |
import io | |
import json | |
import logging | |
import re | |
""" | |
Gradio App Interface Specification: | |
Inputs: | |
- prompt: Text Input. Description: 'Enter a prompt for image generation.' | |
- steps: Slider. Range: [1, 32]. value: 16. Description: 'Number of steps for image generation.' | |
- model: Dropdown. Options: ['TurboAnime.saftensors', 'OtherModel']. value: 'TurboAnime.saftensors'. Description: 'Select the model for image generation.' | |
- styles: CheckboxGroup. Options: ['RayORender', 'OtherStyle']. value: ['RayORender']. Description: 'Select styles for image generation.' | |
Outputs: | |
- image: Image. Description: 'Generated image based on the prompt.' | |
- log: Text. Description: 'Log of the image generation process.' | |
""" | |
import gradio as gr | |
import subprocess | |
import json | |
import base64 | |
from PIL import Image | |
import io | |
import time | |
# π Start time for the entire script | |
start_time = time.time() | |
def analyze_all_images(custom_prompt): | |
global file_array | |
descriptions = [] | |
for file_info in file_array: | |
file_path = find_file_by_hash(file_info["hash"]) | |
if file_path: | |
with Image.open(file_path) as img: | |
description = get_image_description(img, custom_prompt) | |
save_text(file_info["hash"], description, "", "", True) # Save the description to the text file | |
descriptions.append(description) | |
return descriptions | |
def generate_status_message(index, total, file_name): | |
return f"Processing {index + 1}/{total}: {file_name}" | |
def analyze_thumbs_up_images(custom_prompt): | |
thumbs_up_images = load_thumbs_up_gallery() # Assuming this function returns full paths | |
total_images = len(thumbs_up_images) | |
descriptions = [] | |
status_messages = [] | |
for index, image_path in enumerate(thumbs_up_images): | |
with Image.open(image_path) as img: | |
description = get_image_description(img, custom_prompt) | |
descriptions.append((os.path.basename(image_path), description)) | |
status_messages.append(generate_status_message(index, total_images, os.path.basename(image_path))) | |
save_text(file_info["hash"], description, "", "", True) # Save the description to the text file | |
return descriptions, status_messages | |
# π’ Log Status | |
def log_status(progress, message): | |
elapsed_time = time.time() - start_time | |
log_message = {"status": message, "progress": f"{progress}%", "time_elapsed": f"{elapsed_time:.2f} seconds"} | |
return json.dumps(log_message) | |
def process_files(file_info): | |
try: | |
if file_info: | |
return [_process_file(file_path) for file_path in file_info] if isinstance(file_info, list) else [_process_file(file_info)] | |
return [] | |
except Exception as e: | |
print(f"Error processing files: {e}") | |
raise e | |
def clear_uploads_folder(): | |
os.makedirs(recycle_bin_folder, exist_ok=True) | |
for file_name in os.listdir(_get_project_folder()): | |
dest_file_path = os.path.join(recycle_bin_folder, file_name) | |
if os.path.exists(dest_file_path): | |
base, extension = os.path.splitext(file_name) | |
i = 1 | |
new_file_name = f"{base}_{i}{extension}" | |
new_dest_file_path = os.path.join(recycle_bin_folder, new_file_name) | |
while os.path.exists(new_dest_file_path): | |
i += 1 | |
new_file_name = f"{base}_{i}{extension}" | |
new_dest_file_path = os.path.join(recycle_bin_folder, new_file_name) | |
dest_file_path = new_dest_file_path | |
shutil.move(os.path.join(_get_project_folder(), file_name), dest_file_path) | |
return "Uploads folder cleared!" | |
def undo_last_deletion(): | |
file_list = os.listdir(recycle_bin_folder) | |
if file_list: | |
last_deleted_file = file_list[-1] | |
shutil.move(os.path.join(recycle_bin_folder, last_deleted_file), _get_project_folder()) | |
return f"Restored: {last_deleted_file}" | |
return "No files to restore" | |
def next_session(): | |
global project_index | |
project_index += 1 | |
_init_project_directory() | |
return f"Switched to the next session! Current session index: {project_index}" | |
def previous_session(): | |
global project_index | |
if project_index > 0: project_index -= 1 | |
return f"Switched to the previous session! Current session index: {project_index}" | |
def get_next_image(): | |
global current_image | |
images = get_images() | |
if images: | |
current_image = images[0] if current_image is None else images[(images.index(current_image) + 1) % len(images)] | |
else: | |
current_image = default_image | |
return current_image | |
def get_previous_image(): | |
global current_image | |
images = get_images() | |
if images: | |
current_image = images[-1] if current_image is None else images[(images.index(current_image) - 1) % len(images)] | |
else: | |
current_image = default_image | |
return current_image | |
def process_all_zips(): | |
for file_name in os.listdir(_get_project_folder()): | |
file_path = os.path.join(_get_project_folder(), file_name) | |
if zipfile.is_zipfile(file_path): _process_file(file_path) | |
return "All zip files processed!" | |
# π API Call | |
def make_api_call(url, payload): | |
try: | |
response = subprocess.run(["curl", "-X", "POST", url, "-H", "Content-Type: application/json", "--data", json.dumps(payload)], capture_output=True, text=True) | |
if response.returncode == 0: | |
return json.loads(response.stdout), None | |
else: | |
return None, log_status(progress, f"API call failed with return code: {response.returncode}") | |
except Exception as e: | |
return None, log_status(progress, f"API call failed with exception: {str(e)}") | |
# πΌοΈ Save Image | |
def save_image(image_data): | |
image = Image.open(io.BytesIO(base64.b64decode(image_data))) | |
return image | |
# π Main Execution Function | |
def generate_image(prompt, steps, model, styles): | |
base_url = "http://73.255.78.150:7909" | |
base_payload = { | |
"prompt": prompt, | |
"steps": steps, | |
"model": model, | |
"styles": styles, | |
"negative_prompt": "album, duplicate, crowded, multiple, stuff, messy, photo, collage, doll, caricature, render. mannequin.", | |
"seed": -1, | |
"height": 768, | |
"width": 1280, | |
"sampler_index": "DPM++ 2M SDE Karras", | |
"restore_faces": False, | |
"tiling": False, | |
"n_iter": 1, | |
"batch_size": 1, | |
"cfg_scale": 2.0, | |
"subseed": -1, | |
"subseed_strength": 0.0, | |
"seed_resize_from_h": -1, | |
"seed_resize_from_w": -1, | |
"seed_enable_extras": False, | |
"enable_hr": False, | |
"denoising_strength": 0.0, | |
"hr_scale": 2.0, | |
"hr_upscaler": "Latent", | |
"hr_second_pass_steps": 0, | |
"hr_resize_x": 0, | |
"hr_resize_y": 0, | |
"hr_sampler_index": "", | |
"hr_prompt": "", | |
"hr_negative_prompt": "", | |
"override_settings_texts": "" | |
} | |
progress = 0 | |
log = log_status(progress, "Starting image generation...") | |
response, error_log = make_api_call(f"{base_url}/sdapi/v1/txt2img", base_payload) | |
if error_log: | |
return None, error_log | |
if response and 'images' in response: | |
image_data = response['images'][0] | |
image = save_image(image_data) | |
log = log_status(100, "Image generated successfully.") | |
return image, log | |
else: | |
log = log_status(progress, "Failed to generate image.") | |
return None, log | |
max_image_size = 768 | |
image_folder = "/Users/gev7418/Library/CloudStorage/OneDrive-Personal/Gradio Script Building Blocks/HatchDuo-Gradio/Images" | |
thumbs_up_folder = os.path.join(image_folder, "Thumbs_Up") | |
thumbs_down_folder = os.path.join(image_folder, "Thumbs_Down") | |
allowed_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp'] | |
folders_cycle = [image_folder, thumbs_up_folder, thumbs_down_folder] | |
# Set up logging to print to the terminal | |
logging.basicConfig(level=logging.INFO) | |
# OpenAI API Key | |
api_key = "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" # Replace with your OpenAI API Key | |
import subprocess | |
def convert_and_rename_files_in_directory(directory, original_extension, new_extension, new_base_name): | |
""" | |
Converts all files in the specified directory from the original extension to PNG, and renames them to a new base name followed by a number. | |
""" | |
files = [f for f in os.listdir(directory) if f.endswith(original_extension)] | |
for index, file in enumerate(sorted(files)): | |
new_name = f"{new_base_name}_{index}{new_extension}" | |
original_file_path = os.path.join(directory, file) | |
new_file_path = os.path.join(directory, new_name) | |
# Convert to PNG using a subprocess call to a command line tool like ImageMagick | |
subprocess.run(['convert', original_file_path, new_file_path]) | |
os.rename(new_file_path, os.path.join(directory, new_name)) | |
logging.info(f"All {original_extension} files in {directory} have been converted to PNG and renamed to {new_base_name} format.") | |
def encode_image(image): | |
""" | |
This function converts the image to bytes and returns the base64 encoding of the image file | |
""" | |
logging.info("Encoding image to base64") | |
image_bytes = io.BytesIO() | |
if image.mode == 'RGBA': | |
# Convert RGBA to RGB | |
image = image.convert('RGB') | |
image.save(image_bytes, format='JPEG') | |
image_bytes = image_bytes.getvalue() | |
base64_image = base64.b64encode(image_bytes).decode('utf-8') | |
return base64_image | |
def get_image_description(image, custom_prompt): | |
""" | |
This function sends the image to the OpenAI API and returns the response. | |
It now checks if there's custom prompt content before sending it to the API. | |
""" | |
logging.info("Getting image description from OpenAI API") | |
base64_image = encode_image(image) | |
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"} | |
# Check if custom_prompt is not empty, otherwise use a default prompt | |
if not custom_prompt.strip(): | |
custom_prompt = "Describe this image" | |
payload = { | |
"model": "gpt-4-vision-preview", | |
"messages": [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": custom_prompt}, | |
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}} | |
] | |
} | |
], | |
"max_tokens": 300 | |
} | |
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) | |
response_payload = json.loads(response.text) | |
content = response_payload['choices'][0]['message']['content'] | |
logging.info("Received response from OpenAI API") | |
return content | |
def create_directories(): | |
print("Creating directories...") | |
os.makedirs(thumbs_up_folder, exist_ok=True) | |
os.makedirs(thumbs_down_folder, exist_ok=True) | |
create_directories() | |
def get_files(folder): | |
print(f"Getting files from {folder}...") | |
return [os.path.join(dp, f) for dp, dn, filenames in os.walk(folder) for f in filenames if os.path.splitext(f)[1].lower() in allowed_extensions] | |
def hash_image_pixels(file_path): | |
""" | |
Generates a hash for an image based on its pixel content. | |
""" | |
if os.path.splitext(file_path)[1].lower() in allowed_extensions: | |
with Image.open(file_path) as img: | |
# Convert the image to RGBA (to standardize if images are in different modes) | |
img_rgba = img.convert('RGBA') | |
# Calculate the new height and width to maintain aspect ratio | |
aspect_ratio = img_rgba.width / img_rgba.height | |
new_height = min(max_image_size, img_rgba.height) | |
new_width = int(aspect_ratio * new_height) | |
if img_rgba.height > max_image_size: | |
new_height = max_image_size | |
new_width = int(new_height * aspect_ratio) | |
# Use Image.Resampling.LANCZOS for better quality resizing | |
img_resized = img_rgba.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
# Get the bytes of the resized image data | |
img_bytes = io.BytesIO() | |
img_resized.save(img_bytes, format='PNG') # PNG format to ensure consistency across platforms | |
img_bytes = img_bytes.getvalue() | |
# Generate a hash of the resized image bytes | |
hash_obj = hashlib.sha256(img_bytes) | |
return hash_obj.hexdigest() | |
else: | |
return None | |
# Global cache for hashes to file paths | |
hash_to_path_cache = {} | |
def update_hash_to_path_cache(): | |
global hash_to_path_cache | |
hash_to_path_cache.clear() | |
for folder in folders_cycle: | |
for file_path in get_files(folder): | |
file_hash = hash_image_pixels(file_path) | |
hash_to_path_cache[file_hash] = file_path | |
def find_file_by_hash(file_hash): | |
# Use the cache to find the file path | |
file_path = hash_to_path_cache.get(file_hash, None) | |
if file_path and os.path.exists(file_path): | |
return file_path | |
# If the file was not found or doesn't exist at the cached location, | |
# search in the thumbs up and thumbs down folders | |
for folder in [thumbs_up_folder, thumbs_down_folder]: | |
for dp, dn, filenames in os.walk(folder): | |
for f in filenames: | |
potential_path = os.path.join(dp, f) | |
if os.path.splitext(f)[1].lower() in allowed_extensions: | |
if hash_image_pixels(potential_path) == file_hash: | |
# Update the cache with the new location | |
hash_to_path_cache[file_hash] = potential_path | |
return potential_path | |
# If the file is not found in any of the locations, return None | |
return None | |
def build_file_array(): | |
print("Building file array...") | |
files = [] | |
for folder in folders_cycle: | |
for file_path in get_files(folder): | |
file_hash = hash_image_pixels(file_path) | |
status = "π Rated Up" if folder == thumbs_up_folder else "π Rated Down" if folder == thumbs_down_folder else "β³ Pending" | |
files.append({"path": file_path, "hash": file_hash, "status": status, "date_modified": os.path.getmtime(file_path)}) | |
# Create text files for any images missing their text file counterpart | |
text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt') | |
if not os.path.exists(text_file_path): | |
with open(text_file_path, 'w') as text_file: | |
text_file.write("") # Create an empty text file | |
files = sorted(files, key=lambda x: x["date_modified"], reverse=True) | |
return files | |
def refresh_file_array_and_hashes(): | |
global file_array, hash_list | |
file_array = build_file_array() | |
hash_list = [file['hash'] for file in file_array] | |
print("File array and hash list updated.") | |
update_hash_to_path_cache() | |
refresh_file_array_and_hashes() | |
current_index = 0 | |
def save_text(file_hash, text, prepend_text="", append_text="", save_and_overwrite_changes=True): | |
if not save_and_overwrite_changes: | |
print("Skipping saving due to user preference.") | |
return | |
file_path = find_file_by_hash(file_hash) | |
if file_path: | |
text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt') | |
final_text = "" | |
if not save_and_overwrite_changes and os.path.exists(text_file_path): | |
with open(text_file_path, 'r') as existing_file: | |
existing_content = existing_file.read() | |
final_text = (prepend_text + ", " if prepend_text else "") + existing_content + (", " + append_text if append_text else "") | |
else: | |
final_text = (prepend_text + ", " if prepend_text else "") + text + (", " + append_text if append_text else "") | |
print(f"Saving text for file hash {file_hash} to {text_file_path}...") | |
with open(text_file_path, 'w') as text_file: | |
text_file.write(final_text) | |
def get_text(file_hash): | |
file_path = find_file_by_hash(file_hash) | |
if file_path: | |
text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt') | |
if os.path.exists(text_file_path): | |
print(f"Getting text for file hash {file_hash} from {text_file_path}...") | |
with open(text_file_path) as text_file: | |
return text_file.read() | |
return "" | |
def update_index_for_navigation(current_text, prepend_text, append_text, navigate_forward=True, save_and_overwrite_changes=False): | |
global current_index, file_array, hash_list | |
if current_text or not save_and_overwrite_changes: # Save only if there's something to save or if saving changes is not required | |
file_hash = hash_list[current_index] | |
save_text(file_hash, current_text, prepend_text, append_text, save_and_overwrite_changes) | |
if navigate_forward: | |
current_index = (current_index + 1) % len(hash_list) # Cycle to the first item if at the end | |
else: | |
current_index = (current_index - 1) % len(hash_list) # Cycle to the last item if at the beginning | |
def get_file(navigate_forward, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): | |
global current_index, file_array, hash_list | |
print(f"Getting file, navigate_forward: {navigate_forward}...") | |
update_index_for_navigation(current_text, prepend_text, append_text, navigate_forward, save_and_overwrite_changes) | |
if current_index < len(hash_list): | |
current_hash = hash_list[current_index] | |
file_info = next((item for item in file_array if item["hash"] == current_hash), None) | |
if file_info: | |
file_path = find_file_by_hash(current_hash) | |
if file_path: | |
with Image.open(file_path) as img: | |
# Calculate the new height and width to maintain aspect ratio, with a max height of 768 for display in gradio | |
aspect_ratio = img.width / img.height | |
new_height = min(768, img.height) # Set max height to 768 for gradio display | |
new_width = int(aspect_ratio * new_height) | |
# Use Image.Resampling.LANCZOS for better quality resizing | |
img_resized = img.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
img_resized_bytes = io.BytesIO() | |
img_resized.save(img_resized_bytes, format='PNG') | |
img_resized_bytes = img_resized_bytes.getvalue() | |
text = get_text(current_hash) # Load text file contents into textbox | |
if not pause_api_call: | |
text = get_image_description(img, custom_prompt) # Get new description from OpenAI | |
save_text(current_hash, text, prepend_text, append_text, save_and_overwrite_changes) # Optionally save the new description | |
# Convert the resized image to a numpy array for display | |
img_resized_np = np.array(Image.open(io.BytesIO(img_resized_bytes))) | |
return img_resized_np, current_hash, file_info["status"], text, os.path.basename(file_path), file_path, os.path.relpath(file_path, start=image_folder), os.path.basename(file_path).replace(os.path.splitext(os.path.basename(file_path))[1], '.txt'), find_file_by_hash(current_hash).replace(os.path.splitext(find_file_by_hash(current_hash))[1], '.txt'), os.path.relpath(find_file_by_hash(current_hash).replace(os.path.splitext(find_file_by_hash(current_hash))[1], '.txt'), start=image_folder) | |
return None, "File not found", "β³ Pending", "", "", "", "", "", "", "" | |
def move_file(direction, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): | |
global current_index, file_array, hash_list | |
print(f"Moving file in direction {direction}...") | |
if current_index < len(hash_list): | |
current_hash = hash_list[current_index] | |
file_info = next((item for item in file_array if item["hash"] == current_hash), None) | |
if file_info: | |
source_file = find_file_by_hash(current_hash) | |
if source_file: | |
destination = thumbs_up_folder if direction == "up" else thumbs_down_folder if direction == "down" else None | |
new_status = "π Rated Up" if direction == "up" else "π Rated Down" if direction == "down" else None | |
if destination: | |
shutil.move(source_file, os.path.join(destination, os.path.basename(source_file))) | |
file_info["status"] = new_status | |
# Get new description and save it | |
with Image.open(source_file) as img: | |
description = get_image_description(img, custom_prompt) | |
save_text(current_hash, description, prepend_text, append_text, save_and_overwrite_changes) | |
return get_file(True, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt) | |
return None, "Invalid direction or file not found", "β³ Pending", "", "", "", "", "", "", "" | |
def reset_files(current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): | |
global file_array, current_index, hash_list | |
print("Resetting files and clearing text...") | |
for file_info in file_array: | |
source_file = find_file_by_hash(file_info["hash"]) | |
if source_file and os.path.exists(source_file): | |
source_text = source_file.replace(os.path.splitext(source_file)[1], '.txt') | |
if os.path.exists(source_text): | |
shutil.move(source_text, os.path.join(image_folder, os.path.basename(source_text))) | |
shutil.move(source_file, os.path.join(image_folder, os.path.basename(source_file))) | |
with open(os.path.join(image_folder, os.path.basename(source_text)), 'w') as text_file: | |
text_file.write("") | |
file_info["status"] = "β³ Pending" | |
file_array = build_file_array() | |
hash_list = [file['hash'] for file in file_array] | |
update_hash_to_path_cache() | |
return get_file(True, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt) | |
def delete_file(current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): | |
global current_index, file_array, hash_list | |
print("Deleting file...") | |
if current_index < len(hash_list): | |
current_hash = hash_list[current_index] | |
source_file = find_file_by_hash(current_hash) | |
if source_file: | |
source_text = source_file.replace(os.path.splitext(source_file)[1], '.txt') | |
os.remove(source_file) | |
os.remove(source_text) | |
hash_list.remove(current_hash) # Remove the hash from the hash_list | |
file_array = [file for file in file_array if file["hash"] != current_hash] # Rebuild file_array without the deleted file | |
if current_index >= len(hash_list): current_index = len(hash_list) - 1 | |
update_hash_to_path_cache() | |
return get_file(True, current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt) | |
def load_image_details(): | |
print("Loading image details...") | |
image_details = [] | |
for folder in folders_cycle: | |
for file_path in get_files(folder): | |
file_hash = hash_image_pixels(file_path) | |
status = "π Rated Up" if folder == thumbs_up_folder else "π Rated Down" if folder == thumbs_down_folder else "β³ Pending" | |
text_file_path = file_path.replace(os.path.splitext(file_path)[1], '.txt') | |
text_content = "" | |
if os.path.exists(text_file_path): | |
with open(text_file_path, 'r') as text_file: | |
text_content = text_file.read() | |
image_details.append({ | |
"Text Content": text_content, | |
"Rating Status": status, | |
"File Hash": file_hash, | |
"Image Path": file_path, | |
"Text Path": text_file_path if os.path.exists(text_file_path) else "N/A" | |
}) | |
return pd.DataFrame(image_details) | |
def load_text_files_as_df(): | |
print("Loading text files into dataframe...") | |
text_files = [] | |
for file in os.listdir(image_folder): | |
if file.endswith('.txt'): | |
file_path = os.path.join(image_folder, file) | |
with open(file_path, 'r') as text_file: | |
text_content = text_file.read() | |
text_files.append({ | |
"File Name": file.replace('.txt', ''), | |
"Content": text_content, | |
"Date Modified": datetime.fromtimestamp(os.path.getmtime(file_path)).strftime('%Y-%m-%d %H:%M:%S') | |
}) | |
return pd.DataFrame(text_files) | |
# Function to merge text_files_df and image_details_df into a single dataframe | |
def load_combined_details(): | |
print("Loading text file details...") | |
text_df = load_text_files_as_df() | |
image_details_df = load_image_details() | |
# Ensure the 'File Name' column exists in image_details_df by extracting it from 'Image Path' | |
image_details_df['File Name'] = image_details_df['Image Path'].apply(lambda x: os.path.basename(x).replace(os.path.splitext(os.path.basename(x))[1], '')) | |
combined_df = pd.merge(text_df, image_details_df, on="File Name", how="outer") | |
# Reorder dataframe fields according to specified order: Date Modified, Rating Status, File Name, Text Content, File Hash, Text Path, Image Path | |
combined_df = combined_df[['Date Modified', 'Rating Status', 'File Name', 'Text Content', 'File Hash', 'Text Path', 'Image Path']] | |
return combined_df | |
def load_gallery(): | |
print("Loading gallery...") | |
return sorted([os.path.join(image_folder, f) for f in os.listdir(image_folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower()) | |
def load_thumbs_up_gallery(): | |
print("Loading Thumbs Up gallery...") | |
return sorted([os.path.join(thumbs_up_folder, f) for f in os.listdir(thumbs_up_folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower()) | |
def load_thumbs_down_gallery(): | |
print("Loading Thumbs Down gallery...") | |
return sorted([os.path.join(thumbs_down_folder, f) for f in os.listdir(thumbs_down_folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower()) | |
def load_all_folders_gallery(): | |
print("Loading all folders gallery...") | |
all_images = [] | |
for folder in folders_cycle: | |
all_images.extend(sorted([os.path.join(folder, f) for f in os.listdir(folder) if os.path.splitext(f)[1].lower() in allowed_extensions], key=lambda x: os.path.basename(x).lower())) | |
return all_images | |
def load_text_files(): | |
print("Loading text files...") | |
return sorted([[file, open(os.path.join(image_folder, file),'r').read()] for file in os.listdir(image_folder) if file.endswith('.txt')], key=lambda x: x[0].lower()) | |
def update_text_content(df): | |
global text_files_df | |
print("Updating text content...") | |
for row in df: | |
if len(row) == 3: | |
file_name, new_content, date_added = row | |
file_path = os.path.join(image_folder, file_name) | |
if os.path.exists(file_path): | |
file_hash = hash_file(file_path) | |
save_text(file_hash, new_content) | |
else: | |
print("Error: Unexpected number of values in row. Expected 3 values per row.") | |
# Step 1: Define a function to update the dataframe | |
def refresh_text_files(): | |
print("Refreshing text files...") | |
updated_text_files = sorted([[file, open(os.path.join(image_folder, file),'r').read()] for file in os.listdir(image_folder) if file.endswith('.txt')], key=lambda x: x[0].lower()) | |
return updated_text_files | |
def save_uploaded_image(image_file): | |
""" | |
Attempts to save the uploaded image to the designated image folder. Returns the path of the saved image or None if an error occurs. | |
""" | |
try: | |
os.makedirs(image_folder, exist_ok=True) # Ensure the image folder exists | |
image_path = os.path.join(image_folder, image_file.name) | |
with open(image_path, "wb") as file: | |
file.write(image_file.content) # Write the byte content of the file | |
print(f"Image saved successfully: {image_path}") | |
return image_path | |
except Exception as e: | |
print(f"Error saving image: {e}") | |
return None | |
def create_text_file_for_image(image_path): | |
""" | |
Generates an empty text file corresponding to an uploaded image, using the same base name. | |
""" | |
text_file_path = f"{os.path.splitext(image_path)[0]}.txt" | |
open(text_file_path, 'w').close() # Efficiently create an empty text file | |
return text_file_path | |
def handle_image_upload(uploaded_files): | |
status_messages = [] | |
for uploaded_file in uploaded_files: | |
file_extension = os.path.splitext(uploaded_file.name)[1].lower() | |
if file_extension in allowed_extensions: | |
# Ensure the image folder exists | |
os.makedirs(image_folder, exist_ok=True) | |
# Save the file with a unique name based on its hash | |
unique_name = hashlib.sha256(uploaded_file.name.encode()).hexdigest() + file_extension | |
save_path = os.path.join(image_folder, unique_name) | |
with open(save_path, "wb") as file: | |
file.write(uploaded_file.content) | |
# Optionally, create a corresponding text file | |
text_file_path = save_path + ".txt" | |
with open(text_file_path, "w") as text_file: | |
text_file.write(f"Image file: {unique_name} uploaded successfully.") | |
status_messages.append(f"Uploaded and saved {uploaded_file.name} successfully.") | |
else: | |
status_messages.append(f"File {uploaded_file.name} has an unsupported extension ({file_extension}) and was not uploaded.") | |
return "\n".join(status_messages) | |
def refresh_image_description(current_text, prepend_text, append_text, pause_api_call, save_and_overwrite_changes, custom_prompt): | |
global current_index, file_array | |
if not pause_api_call: | |
file_info = file_array[current_index] | |
file_path = find_file_by_hash(file_info["hash"]) | |
if file_path: | |
with Image.open(file_path) as img: | |
# Use the custom_prompt parameter when calling get_image_description | |
text = get_image_description(img, custom_prompt) | |
save_text(file_info["hash"], text, prepend_text, append_text, save_and_overwrite_changes) | |
return np.array(img), file_info["hash"], file_info["status"], text, os.path.basename(file_path), file_path, os.path.relpath(file_path, start=image_folder), os.path.basename(file_path).replace(os.path.splitext(os.path.basename(file_path))[1], '.txt'), find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), os.path.relpath(find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), start=image_folder) | |
# If API call is paused or file not found, return current state without changes | |
return None, file_info["hash"], file_info["status"], current_text, os.path.basename(file_path), file_path, os.path.relpath(file_path, start=image_folder), os.path.basename(file_path).replace(os.path.splitext(os.path.basename(file_path))[1], '.txt'), find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), os.path.relpath(find_file_by_hash(file_info["hash"]).replace(os.path.splitext(find_file_by_hash(file_info["hash"]))[1], '.txt'), start=image_folder) | |
# Backup functionality for successful app launches | |
backup_folder = "Backup_Scripts" | |
os.makedirs(backup_folder, exist_ok=True) | |
def backup_script(): | |
""" | |
This function creates a backup of the current script in the designated backup folder. | |
""" | |
current_script_path = os.path.realpath(__file__) | |
backup_script_path = os.path.join(backup_folder, f"backup_{os.path.basename(current_script_path)}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.py") | |
shutil.copy2(current_script_path, backup_script_path) | |
print(f"Backup of the script created at {backup_script_path}") | |
def app_launch_success(): | |
""" | |
This function is called when the app successfully launches. | |
It triggers the backup of the current script. | |
""" | |
print("App successfully launched. Creating a backup of the script...") | |
backup_script() | |
def refresh_all(): | |
combined_details = load_combined_details() | |
image_gallery_content = load_gallery() | |
thumbs_up_gallery_content = load_thumbs_up_gallery() | |
# Reinitialize the hash array and the index for new images | |
update_hash_to_path_cache() | |
refresh_file_array_and_hashes() | |
current_index = 0 # Reset the index to start from the first image | |
return combined_details, image_gallery_content, thumbs_up_gallery_content | |
def combined_action(prompt, steps, model, styles, custom_prompt): | |
# Trigger the OpenAI API for description and wait for the response | |
description, status = trigger_openai_api(custom_prompt) | |
if status == "success": | |
# Once the response is received, generate the image based on the prompt and other parameters | |
generated_image, log = generate_image(prompt, steps, model, styles) | |
else: | |
generated_image, log = None, "Failed to generate image due to API call failure." | |
return generated_image, log, description | |
# π Setup Environment π | |
import gradio as gr | |
import os | |
from pathlib import Path | |
import subprocess | |
# Define the directory to store uploaded images | |
image_folder = "./Images" | |
os.makedirs(image_folder, exist_ok=True) # Create the directory if it doesn't exist | |
# π Function Definitions π | |
def save_and_show_images(uploaded_files): | |
images_to_display = [] | |
# Process each uploaded file | |
for index, uploaded_file in enumerate(uploaded_files): | |
# Define a base file name with index | |
base_file_name = f"Img-{index}" | |
# Check for existing files and increment index to avoid overwriting | |
existing_files = [f for f in Path(image_folder).rglob('*') if f.is_file() and f.suffix in ['.jpg', '.png', '.txt']] | |
while any(base_file_name in file.name for file in existing_files): | |
index += 1 | |
base_file_name = f"Img-{index}" | |
# Determine the file extension (jpg or png) | |
file_extension = "jpg" if uploaded_file[:3] == b'\xff\xd8\xff' else "png" | |
final_file_name = f"{base_file_name}.{file_extension}" | |
image_path = os.path.join(image_folder, final_file_name) | |
# Write the bytes to a new file in the specified directory | |
with open(image_path, "wb") as file: | |
file.write(uploaded_file) | |
# Create a corresponding text file for the image | |
text_file_path = os.path.join(image_folder, f"{base_file_name}.txt") | |
with open(text_file_path, "w") as text_file: | |
text_file.write(f"Image file: {final_file_name}") | |
# Add the path (or ideally, a URL to the file) to the list to display | |
images_to_display.append(image_path) | |
# Ensure all images have a corresponding text file | |
for image_file_path in Path(image_folder).rglob('*'): | |
if image_file_path.suffix in ['.jpg', '.png']: | |
base_name = os.path.splitext(image_file_path.name)[0] | |
text_file_path = os.path.join(image_folder, f"{base_name}.txt") | |
if not os.path.exists(text_file_path): | |
with open(text_file_path, "w") as text_file: | |
text_file.write(f"Image file: {image_file_path.name}") | |
return images_to_display | |
with gr.Blocks() as app: | |
with gr.Row(): | |
upload_btn = gr.File(label="Upload Images", type="binary", file_count='multiple') | |
gallery = gr.Gallery(label="Uploaded Images Gallery") | |
upload_btn.change(fn=save_and_show_images, inputs=upload_btn, outputs=gallery) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("## Image Viewer") | |
with gr.Column(): | |
# Add an upload button for uploading files to the images folder | |
image = gr.Image(height=768, label="Current Image", image_mode="RGBA", width="max", elem_id="current_image", type="numpy") | |
with gr.Row(): | |
thumbs_down_btn = gr.Button("π Rate Down") | |
thumbs_up_btn = gr.Button("π Rate Up") | |
with gr.Row(): | |
prev_btn = gr.Button("β¬ Previous") | |
next_btn = gr.Button("Next β‘") | |
with gr.Row(): | |
status_display = gr.Textbox(label="Rating Status", interactive=False) | |
image_path_display = gr.Textbox(label="Image Path", interactive=False) | |
with gr.Column(): | |
# Existing setup for checkboxes | |
with gr.Row(): | |
pause_api_call_checkbox = gr.Checkbox(label="Pause OpenAI API Calls", value=True) | |
save_and_overwrite_changes_checkbox = gr.Checkbox(label="Save and Overwrite Changes", value=True) | |
# New textbox for custom OpenAI API instructions | |
custom_instructions = gr.Textbox(label="Custom Instructions", value="Use the art of deduction and creativity to generate a persona profile and 3 inspiration words to describe the image without describing the subject. Wrap it up with a concise 1 sentence caption of the image with the subject in high detail. JSON Format needed: Futuristic Persona Profile: The subject exudes a sense of readiness and authority, dressed in attire that hints at a future dominated by advanced technology and interstellar travel. This character could be envisioned as a commander or a pioneer in a futuristic saga, marked by their composed nature and the streamlined design of their gear. Futuristic Caption: A resolute figure of tomorrow, standing firm with a serene resolve, garbed in a sleek, technologically superior suit that narrates tales of distant realms and adventures. Poised Persona Profile: The individual appears as a beacon of confidence and tactical acumen. Their outfit suggests a world of progressive technology and potential space conquests. This persona might be a strategist or a guardian in a speculative fiction setting, distinguished by their steady presence and the modernistic cut of their uniform. Poised Caption: A visionary sentinel of the cosmos, poised with a calm yet assertive demeanor, dressed in an advanced, form-enhancing suit that hints at the mysteries of the universe yet to unfold. Advanced Persona Profile: The figure stands as a symbol of assurance and innovation, clad in a costume that forecasts an era of sophisticated technology and cosmic exploration. This character could represent an expert or a defender in a futuristic tale, highlighted by their tranquil posture and the elegant configuration of their attire. Advanced Caption: An unwavering pioneer of the future, the individual poses with a composed assurance, enveloped in a cutting-edge suit that speaks of advanced civilizations and uncharted frontiers.", placeholder="Enter custom instructions...", lines=2, max_lines=5) | |
with gr.Accordion("π Thumbs Up Gallery", open=False): | |
thumbs_up_gallery = gr.Gallery(label="Thumbs Up Image Gallery", value=load_thumbs_up_gallery(), every=5, columns=3, rows=1, object_fit="contain", height="auto") | |
with gr.Row(): | |
text_display = prompt = gr.Textbox(label="Image Analysis", lines=3, max_lines=10, interactive=True, placeholder="API call paused. Manually enter description.") | |
output_image = gr.Image(type="pil", label="Visual Analysis Analog") | |
with gr.Row(): | |
trigger_api_btn = gr.Button("Analyze Aesthetic!") | |
combined_btn = gr.Button("Analyze & Generate") # New combined action button | |
generate_btn = gr.Button("Generate") | |
analyze_all_btn = gr.Button("Analyze All Thumbs Up") | |
descriptions_display = gr.Dataframe() | |
status_display = gr.Textbox(label="Status", lines=10, interactive=False) | |
analyze_all_btn.click(analyze_thumbs_up_images, inputs=[custom_instructions], outputs=[descriptions_display, status_display]) | |
with gr.Row(): | |
analyze_everything_btn = gr.Button("Analyze All Images") | |
analyze_all_btn.click(analyze_all_images, inputs=[custom_instructions], outputs=[descriptions_display, status_display]) | |
analyze_everything_btn.click(analyze_all_images, inputs=[custom_instructions], outputs=[descriptions_display, status_display]) | |
with gr.Row(): | |
gr.Image(type="pil") | |
gr.Image(type="pil") | |
gr.Image(type="pil") | |
with gr.Accordion("Append / Prepend", open=False): | |
with gr.Row(): | |
prepend_text = gr.Textbox(label="Prepend", lines=2, max_lines=5, placeholder="Enter text to prepend...") | |
append_text = gr.Textbox(label="Append", lines=2, max_lines=5, placeholder="Enter text to append...") | |
def trigger_openai_api(custom_prompt): | |
global current_index, file_array | |
if current_index < len(file_array): | |
file_info = file_array[current_index] | |
file_path = find_file_by_hash(file_info["hash"]) | |
if file_path: | |
with Image.open(file_path) as img: | |
# Use the custom_prompt parameter when calling get_image_description | |
text = get_image_description(img, custom_prompt) | |
save_text(file_info["hash"], text, "", "", True) # Assuming you want to save and overwrite changes by default | |
return text, "Triggered OpenAI API successfully." | |
return "", "Failed to trigger OpenAI API." | |
trigger_api_btn.click(trigger_openai_api, inputs=[custom_instructions], outputs=[text_display, status_display]) | |
with gr.Row(): | |
delete_btn = gr.Button("ποΈ Delete") | |
reset_btn = gr.Button("π Reset All Ratings ππ") | |
with gr.Accordion(label="Edit Project", open=False): | |
with gr.Column(scale=0): | |
file_input = gr.File(file_count="multiple", type="binary", label="Upload Images or Zip Files") | |
with gr.Row(): | |
previous_session_button = gr.Button("π Previous Project") | |
next_session_button = gr.Button("Next Projectπ") | |
with gr.Row(): | |
next_img = gr.Button("πΌοΈ Show Next Image") | |
prev_img = gr.Button("πΌοΈ Show Previous Image") | |
with gr.Row(): | |
with gr.Row(): | |
clear_button = gr.Button("ποΈ Clear Uploads Folder") | |
undo_button = gr.Button("β©οΈ Undo Last Deletion") | |
process_zip_button = gr.Button("π Process Zip Files") | |
with gr.Column(): | |
randomize_checkbox = gr.Checkbox(label="π Randomize Every 2 Seconds", value=True) | |
refresh_time_input = gr.Number(label="β±οΈ Refresh Time", value=2) | |
index_display = gr.Textbox(value=str(project_index), label="π’ Session Index", every=5) | |
file_input.change(process_files, inputs=[file_input], outputs=[]) | |
clear_button.click(clear_uploads_folder, inputs=[], outputs=[]) | |
undo_button.click(undo_last_deletion, inputs=[], outputs=[]) | |
next_session_button.click(next_session, inputs=[], outputs=[index_display]) | |
previous_session_button.click(previous_session, inputs=[], outputs=[index_display]) | |
next_img.click(get_next_image, inputs=[], outputs=[gallery]) | |
prev_img.click(get_previous_image, inputs=[], outputs=[gallery]) | |
process_zip_button.click(process_all_zips, inputs=[], outputs=[]) | |
with gr.Accordion("Advanced Generation Settings", open=False): | |
with gr.Column(): | |
steps = gr.Slider(minimum=1, maximum=32, value=16, label="Steps") | |
model = gr.Dropdown(choices=['TurboAnime.saftensors', 'OtherModel'], value='TurboAnime.saftensors', label="Model") | |
styles = gr.CheckboxGroup(choices=['RayORender', 'OtherStyle'], value=['RayORender'], label="Styles") | |
output_log = gr.Textbox(label="Log") | |
# Step 2: Add a button to trigger the update | |
generate_btn.click(fn=generate_image, inputs=[prompt, steps, model, styles], outputs=[output_image, output_log]) | |
# Set the click action for the combined button | |
combined_btn.click(combined_action, inputs=[prompt, steps, model, styles, custom_instructions], outputs=[output_image, output_log, text_display]) | |
with gr.Accordion("Training Data", open=True): | |
refresh_btn = gr.Button("Refresh") | |
with gr.Row(): | |
combined_details_df = gr.Dataframe(label="Combined Text and Image Details", value=load_combined_details(), headers=["Date Modified", "Rating Status", "File Name", "Text Content", "File Hash", "Text Path", "Image Path"], datatype=["str", "str", "str", "str", "str", "str", "str"], every=5) | |
gallery = gr.Gallery(label="Image Gallery", value=load_gallery(), every=5) | |
with gr.Accordion("File Info", open=False): | |
file_hash_display = gr.Textbox(label="File Hash", interactive=False) | |
text_path_display = gr.Textbox(label="Text Path", interactive=False) | |
def delete_all_text_files_content(): | |
text_files = [os.path.join(image_folder, f) for f in os.listdir(image_folder) if f.endswith('.txt')] | |
for file_path in text_files: | |
with open(file_path, 'w') as f: | |
f.write('') # Clear the content of the text file | |
return "All text files' content deleted." | |
with gr.Row(): | |
# Add a button to manually trigger a backup of the script | |
backup_btn = gr.Button("Backup Script") | |
backup_btn.click(backup_script, inputs=[], outputs=[]) | |
delete_all_text_btn = gr.Button("Delete All Text Files Content") | |
delete_all_text_btn.click(delete_all_text_files_content, inputs=[], outputs=status_display) | |
# Modify the button click actions to include the pause_api_call_checkbox state and the save_and_overwrite_changes_checkbox state as arguments | |
reset_btn.click(lambda t, pt, at, p, s, ci: reset_files(t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) | |
prev_btn.click(lambda t, pt, at, p, s, ci: get_file(False, t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) | |
next_btn.click(lambda t, pt, at, p, s, ci: get_file(True, t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) | |
thumbs_up_btn.click(lambda t, pt, at, p, s, ci: move_file("up", t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) | |
thumbs_down_btn.click(lambda t, pt, at, p, s, ci: move_file("down", t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) | |
delete_btn.click(lambda t, pt, at, p, s, ci: delete_file(t, pt, at, p, s, ci), inputs=[text_display, prepend_text, append_text, pause_api_call_checkbox, save_and_overwrite_changes_checkbox, custom_instructions], outputs=[image, file_hash_display, status_display, text_display, image_path_display, text_path_display]) | |
refresh_btn.click(refresh_all, inputs=[], outputs=[combined_details_df, gallery, thumbs_up_gallery]) | |
# Check for successful app launch and backup the script | |
try: | |
app.launch(share=True, server_port=7866, server_name="0.0.0.0") | |
app_launch_success() | |
except Exception as e: | |
print(f"Error launching the app: {e}") | |