File size: 5,908 Bytes
120773d
 
709e431
120773d
 
20b4c83
 
120773d
 
 
709e431
 
 
 
 
 
 
 
 
 
 
 
 
 
120773d
 
709e431
 
 
 
 
 
 
 
 
 
 
120773d
dde0c58
7e89065
 
 
 
 
709e431
7e89065
 
 
 
709e431
7e89065
 
 
 
 
 
 
 
 
 
 
120773d
7e89065
 
 
 
709e431
20b4c83
7e89065
 
 
 
 
 
 
 
 
 
 
 
709e431
dde0c58
7e89065
 
 
 
 
 
 
 
 
 
 
dde0c58
 
7e89065
 
 
 
 
 
 
 
 
20b4c83
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c2b627a
 
 
 
20b4c83
 
c2b627a
 
 
 
 
20b4c83
dde0c58
 
20b4c83
 
 
 
 
 
 
dde0c58
 
20b4c83
 
dde0c58
 
20b4c83
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import os
import time
import asyncio
from dotenv import load_dotenv
from urllib.parse import urlparse
from firecrawl import FirecrawlApp
import gradio as gr

load_dotenv()

def get_firecrawl_app(api_key):
    return FirecrawlApp(api_key=api_key)

async def async_scrape_url(app, url):
    try:
        scrape_status = app.scrape_url(url)
        print(f"Scrape status for {url}: {scrape_status}")
        if 'markdown' in scrape_status:
            return scrape_status['markdown']
        else:
            print(f"Failed to scrape {url}: {scrape_status}")
            return ""
    except Exception as e:
        print(f"Error scraping {url}: {e}")
        return ""

def map_website(app, url):
    try:
        map_status = app.map_url(url)
        if isinstance(map_status, list):
            return map_status
        else:
            print("Failed to map the website:", map_status)
            return []
    except Exception as e:
        print(f"Error mapping website {url}: {e}")
        return []

async def scrape_all_urls(base_url, api_key, limit_rate, progress=gr.Progress(), cancel_event=None):
    try:
        app = get_firecrawl_app(api_key)
        urls = map_website(app, base_url)
        if not urls:
            return "No URLs found. Please check if the base URL is correct.", None

        parsed_url = urlparse(base_url)
        domain = parsed_url.netloc.replace("www.", "")
        os.makedirs('scraped_documentation', exist_ok=True)
        output_file = os.path.join('scraped_documentation', f"{domain}.md")

        with open(output_file, 'w', encoding='utf-8') as md_file:
            for i, url in enumerate(progress.tqdm(urls)):
                if cancel_event and cancel_event.is_set():
                    return "Scraping cancelled.", None
                progress(i / len(urls), f"Scraping {url}")
                markdown_content = await async_scrape_url(app, url)
                md_file.write(f"# {url}\n\n")
                md_file.write(markdown_content)
                md_file.write("\n\n---\n\n")
                if limit_rate and (i + 1) % 10 == 0:
                    time.sleep(60)

        return f"Scraping completed. Output saved to {output_file}", output_file
    except Exception as e:
        print(f"Error during scraping process: {e}")
        return f"Error during scraping process: {e}", None

def count_urls(base_url, api_key):
    try:
        if not api_key:
            return "Please enter your Firecrawl API key first."
        app = get_firecrawl_app(api_key)
        urls = map_website(app, base_url)
        if urls:
            return f"{len(urls)} URLs found. Do you want to proceed with scraping?"
        else:
            return "No URLs found. Please check the base URL or API key."
    except Exception as e:
        print(f"Error counting URLs: {e}")
        return f"Error counting URLs: {e}"

async def gradio_scrape(base_url, api_key, limit_rate, progress=gr.Progress()):
    try:
        if not api_key:
            return "Please enter your Firecrawl API key.", None
        if not base_url:
            return "Please enter a base URL to scrape.", None
        cancel_event = asyncio.Event()
        result, file_path = await scrape_all_urls(base_url, api_key, limit_rate, progress, cancel_event)
        return result, file_path
    except Exception as e:
        print(f"Error in gradio_scrape: {e}")
        return f"Error in gradio_scrape: {e}", None

def cancel_scrape():
    try:
        # This function will be called when the cancel button is clicked
        global cancel_event
        if cancel_event:
            cancel_event.set()
        return "Cancelling scrape operation..."
    except Exception as e:
        print(f"Error cancelling scrape: {e}")
        return f"Error cancelling scrape: {e}"

with gr.Blocks() as iface:
    gr.Markdown("# Docs Scraper")
    gr.Markdown("""
    ## Map and Scrape Website URLs with Firecrawl API
    Enter a base URL, your Firecrawl API key, and choose whether to limit the scraping rate.
    Scraped content will be saved as a markdown file named after the domain.
    """)
    gr.HTML('Don\'t have an API key? <a href="https://firecrawl.dev/" target="_blank" rel="noopener noreferrer">Get one from Firecrawl</a>')
    
    with gr.Row():
        base_url = gr.Textbox(label="Base URL", placeholder="Enter the base URL to scrape")
        api_key = gr.Textbox(label="Firecrawl API Key", type="password")
        limit_rate = gr.Checkbox(
            label="Limit Rate", 
            value=True, 
            info="Enable to limit scraping to 10 URLs per minute. This adheres to Firecrawl API's free tier rate limit."
        )
    
    gr.Markdown("After entering your API key, click 'Count URLs' to determine the number of URLs to be scraped. Then, click 'Scrape URLs' to begin the process.")
    
    with gr.Row():
        with gr.Column(scale=1):
            count_button = gr.Button("Count URLs")
        with gr.Column(scale=1):
            url_count = gr.Textbox(label="URL Count")

    with gr.Row():
        with gr.Column(scale=1):
            scrape_button = gr.Button("Scrape URLs")
            cancel_button = gr.Button("Cancel Scrape")
        with gr.Column(scale=1):
            output = gr.Textbox(label="Output", elem_id="output_textbox")
    
    file_output = gr.File(label="Download Scraped Content")
    
    gr.Markdown("""
    #### Note: 
    The free tier of the Firecrawl API allows for 500 credits per month. 
    If you need to scrape more, consider upgrading to a paid plan.
    """)
    
    count_button.click(count_urls, inputs=[base_url, api_key], outputs=[url_count])
    scrape_button.click(gradio_scrape, inputs=[base_url, api_key, limit_rate], outputs=[output, file_output])
    cancel_button.click(cancel_scrape, outputs=[output])

if __name__ == "__main__":
    global cancel_event
    cancel_event = asyncio.Event()
    iface.launch()