File size: 6,259 Bytes
630b3cc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
from typing import List, Union, Optional
import os
import requests
import re
import time
import shutil
import subprocess
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import chromedriver_autoinstaller
class PatentDownloader:
url = "https://patents.google.com"
def __init__(self, verbose: bool = False):
"""
Parameters
----------
verbose : bool
Print additional debug information.
"""
self.verbose = verbose
self.chrome_path = self.install_chrome()
def install_chrome(self) -> str:
"""
Download and install Google Chrome dynamically.
Returns
-------
str: Path to the Chrome binary.
"""
chrome_path = "/usr/bin/google-chrome"
if not shutil.which("google-chrome"):
print("Downloading and installing Google Chrome...")
subprocess.run(
"wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb -O chrome.deb",
shell=True,
check=True,
)
subprocess.run(
"apt-get update && apt-get install -y ./chrome.deb",
shell=True,
check=True,
)
os.remove("chrome.deb")
if not shutil.which("google-chrome"):
raise ValueError("Google Chrome installation failed!")
return chrome_path
def download(self, patent: Union[str, List[str]], output_path: str = "./",
waiting_time: int = 10, remove_kind_codes: Optional[List[str]] = None) -> None:
"""
Download patent document(s) as PDF.
"""
if isinstance(patent, list) or os.path.isfile(patent):
self.get_pdfs(patent, output_path, waiting_time, remove_kind_codes)
else:
self.get_pdf(patent, output_path, waiting_time, remove_kind_codes)
def get_pdf(self, patent: str, output_path: str = "./", waiting_time: int = 10,
remove_kind_codes: Optional[List[str]] = None) -> None:
"""
Download a single patent PDF.
"""
if remove_kind_codes:
for kind_code in remove_kind_codes:
patent = re.sub(kind_code + "$", "", patent)
# Automatically install ChromeDriver
chromedriver_autoinstaller.install()
# Set up Chrome options
chrome_options = Options()
chrome_options.binary_location = self.chrome_path
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
# Initialize Selenium WebDriver
service = Service()
driver = webdriver.Chrome(service=service, options=chrome_options)
pdf_link = None # Ensure pdf_link is defined
try:
driver.get(self.url)
# Wait for the search input field and interact with it
print("Waiting for the search input field...")
search_input_xpath = "//input[@aria-label='Search patents']"
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.XPATH, search_input_xpath))
)
element = driver.find_element(By.XPATH, search_input_xpath)
print("Search input field located.")
element.send_keys(patent)
element.send_keys(Keys.RETURN)
# Wait for search results to load
print("Waiting for search results to load...")
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
time.sleep(waiting_time)
# Parse HTML and get the PDF link
soup = BeautifulSoup(driver.page_source, "html.parser")
pdf_link = self.get_pdf_link(soup, patent)
except Exception as e:
print(f"Error occurred: {e}")
finally:
driver.quit()
# Download the PDF
if pdf_link:
validate_directory(output_path)
pdf_content = requests.get(pdf_link).content
with open(os.path.join(output_path, f"{patent}.pdf"), "wb") as file:
file.write(pdf_content)
print(f">>> Patent {patent} successfully downloaded <<<")
else:
print(f"Error: PDF link for patent {patent} not found!")
def get_pdfs(self, patents: Union[List[str], str], output_path: str = "./",
waiting_time: int = 10, remove_kind_codes: Optional[List[str]] = None) -> None:
"""
Download multiple patent PDFs from a list or file.
"""
if isinstance(patents, str):
if patents.lower().endswith('csv'):
df_patents = pd.read_csv(patents)
patents = df_patents['patent_number'].to_list()
elif patents.lower().endswith('txt'):
with open(patents, 'r') as txt_file:
patents = txt_file.read().splitlines()
else:
raise NotImplementedError(f'Unsupported file type: {patents}')
for i, patent in enumerate(patents):
print(len(patents) - i, "patent(s) remaining.")
self.get_pdf(patent, output_path, waiting_time, remove_kind_codes)
@staticmethod
def get_pdf_link(soup: BeautifulSoup, patent: str) -> Optional[str]:
"""
Extract the PDF link from parsed HTML.
"""
pdf_links = [link['href'] for link in soup.find_all('a', href=True) if link['href'].lower().endswith("pdf")]
for link in pdf_links:
if patent.lower() in link.lower():
return link
return None
def validate_directory(directory: str) -> None:
"""
Ensure the output directory exists.
"""
if not os.path.exists(directory):
os.makedirs(directory) |