import streamlit as st import requests from bs4 import BeautifulSoup import urllib.parse import mimetypes import io import zipfile import re # Page configuration st.set_page_config(page_title="ImageHarvesters", layout="wide") # Custom CSS st.markdown(""" """, unsafe_allow_html=True) st.title("ImageHarvester") # Initialize session state for URLs if 'urls' not in st.session_state: st.session_state.urls = [''] def add_url(): st.session_state.urls.append('') def remove_url(index): st.session_state.urls.pop(index) def is_valid_url(url): regex = re.compile( r'^(?:http|ftp)s?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4 r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6 r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) return re.match(regex, url) is not None def get_file_extension(content_type): extension = mimetypes.guess_extension(content_type) return extension if extension else '.jpg' def fetch_images(url, max_images): if not is_valid_url(url): st.warning(f"Invalid URL: {url}") return [] try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.9', 'Referer': url } response = session.get(url, headers=headers) st.info(f"Status code for {url}: {response.status_code}") if response.status_code != 200: st.warning(f"Unexpected status code for {url}: {response.status_code}. Attempting to proceed anyway.") soup = BeautifulSoup(response.content, 'html.parser') img_tags = soup.find_all('img') if not img_tags: st.warning(f"No images found on {url}.") return [] images = [] for i, img in enumerate(img_tags): if i >= max_images: break img_url = img.get('src') if img_url: if not img_url.startswith(('http://', 'https://')): img_url = urllib.parse.urljoin(url, img_url) images.append(img_url) return images except requests.exceptions.RequestException as e: st.error(f"An error occurred for {url}: {str(e)}") return [] def download_images(selected_images): try: zip_buffer = io.BytesIO() headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept-Language': 'en-US,en;q=0.9', 'Referer': url } with zipfile.ZipFile(zip_buffer, 'w') as zip_file: for i, img_url in enumerate(selected_images): img_response = session.get(img_url, headers=headers, timeout=10) img_response.raise_for_status() content_type = img_response.headers.get('content-type', '').split(';')[0].strip() file_extension = get_file_extension(content_type) file_name = f'image_{i+1}{file_extension}' zip_file.writestr(file_name, img_response.content) zip_buffer.seek(0) return zip_buffer except requests.exceptions.RequestException as e: st.error(f"An error occurred while downloading images: {str(e)}") return None # Initialize the requests session session = requests.Session() # Input fields for URLs st.subheader("Enter Website URLs") for i, url in enumerate(st.session_state.urls): col1, col2 = st.columns([10, 1]) with col1: st.session_state.urls[i] = st.text_input(f"URL {i+1}", value=url, key=f"url_{i}", help="Enter the URL of the website from which you want to download images.", placeholder="https://example.com", ) with col2: if st.button("Remove", key=f"remove_{i}"): remove_url(i) st.rerun() if st.button("Add URL"): add_url() max_images_per_url = st.number_input("Max images per URL:", min_value=1, value=10, step=1) if st.button("Fetch Images", key="fetch"): all_images = [] for url in st.session_state.urls: if not is_valid_url(url): st.warning(f"Invalid URL: {url}") continue with st.spinner(f"Fetching images from {url}..."): images = fetch_images(url, max_images_per_url) all_images.extend(images) if all_images: st.session_state.images = all_images st.session_state.selected_images = [False] * len(all_images) st.success(f"Found {len(all_images)} images in total. Select the images you want to download.") else: st.warning("No images found or could not fetch images from any of the provided URLs.") if 'images' in st.session_state: st.subheader("Fetched Images") # Buttons for Select All and Clear Selection col1, col2, col3 = st.columns([1, 1, 1]) with col1: if st.button("Select All"): st.session_state.selected_images = [True] * len(st.session_state.images) with col2: if st.button("Clear"): st.session_state.selected_images = [False] * len(st.session_state.images) # Calculate the number of columns num_cols = 4 columns = st.columns(num_cols) selected_images = [] for i, img_url in enumerate(st.session_state.images): checkbox_key = f"check_{i}" # Determine the column to place the image in col = columns[i % num_cols] # Display the image and checkbox in the determined column with col: st.session_state.selected_images[i] = st.checkbox("Select Image", key=checkbox_key, value=st.session_state.selected_images[i]) img_class = "selected" if st.session_state.selected_images[i] else "" st.markdown(f"""
image_{i+1}
{f"image_{i+1}"}
""", unsafe_allow_html=True) if st.session_state.selected_images[i]: selected_images.append(img_url) if selected_images: if st.button("Download Selected Images"): with st.spinner("Preparing download..."): zip_buffer = download_images(selected_images) if zip_buffer: st.download_button( label="Download ZIP", data=zip_buffer, file_name="selected_images.zip", mime="application/zip" ) else: st.error("Failed to prepare the download. Please try again.") else: st.info("Select one or more images to download.")