|
from typing import List, Union, Optional |
|
import os |
|
import requests |
|
import re |
|
import time |
|
import shutil |
|
import subprocess |
|
import pandas as pd |
|
from selenium import webdriver |
|
from selenium.webdriver.common.keys import Keys |
|
from selenium.webdriver.chrome.service import Service |
|
from selenium.webdriver.chrome.options import Options |
|
from bs4 import BeautifulSoup |
|
from selenium.webdriver.common.by import By |
|
from selenium.webdriver.support.ui import WebDriverWait |
|
from selenium.webdriver.support import expected_conditions as EC |
|
import chromedriver_autoinstaller |
|
|
|
|
|
class PatentDownloader: |
|
url = "https://patents.google.com" |
|
|
|
def __init__(self, verbose: bool = False): |
|
""" |
|
Parameters |
|
---------- |
|
verbose : bool |
|
Print additional debug information. |
|
""" |
|
self.verbose = verbose |
|
self.chrome_path = self.install_chrome() |
|
|
|
def install_chrome(self) -> str: |
|
""" |
|
Download and install Google Chrome dynamically. |
|
Returns |
|
------- |
|
str: Path to the Chrome binary. |
|
""" |
|
chrome_path = "/usr/bin/google-chrome" |
|
|
|
if not shutil.which("google-chrome"): |
|
print("Downloading and installing Google Chrome...") |
|
subprocess.run( |
|
"wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb -O chrome.deb", |
|
shell=True, |
|
check=True, |
|
) |
|
subprocess.run( |
|
"apt-get update && apt-get install -y ./chrome.deb", |
|
shell=True, |
|
check=True, |
|
) |
|
os.remove("chrome.deb") |
|
|
|
if not shutil.which("google-chrome"): |
|
raise ValueError("Google Chrome installation failed!") |
|
return chrome_path |
|
|
|
def download(self, patent: Union[str, List[str]], output_path: str = "./", |
|
waiting_time: int = 10, remove_kind_codes: Optional[List[str]] = None) -> None: |
|
""" |
|
Download patent document(s) as PDF. |
|
""" |
|
if isinstance(patent, list) or os.path.isfile(patent): |
|
self.get_pdfs(patent, output_path, waiting_time, remove_kind_codes) |
|
else: |
|
self.get_pdf(patent, output_path, waiting_time, remove_kind_codes) |
|
|
|
def get_pdf(self, patent: str, output_path: str = "./", waiting_time: int = 10, |
|
remove_kind_codes: Optional[List[str]] = None) -> None: |
|
""" |
|
Download a single patent PDF. |
|
""" |
|
if remove_kind_codes: |
|
for kind_code in remove_kind_codes: |
|
patent = re.sub(kind_code + "$", "", patent) |
|
|
|
|
|
chromedriver_autoinstaller.install() |
|
|
|
|
|
chrome_options = Options() |
|
chrome_options.binary_location = self.chrome_path |
|
chrome_options.add_argument("--headless") |
|
chrome_options.add_argument("--no-sandbox") |
|
chrome_options.add_argument("--disable-dev-shm-usage") |
|
|
|
|
|
service = Service() |
|
driver = webdriver.Chrome(service=service, options=chrome_options) |
|
pdf_link = None |
|
|
|
try: |
|
driver.get(self.url) |
|
|
|
|
|
print("Waiting for the search input field...") |
|
search_input_xpath = "//input[@aria-label='Search patents']" |
|
WebDriverWait(driver, 20).until( |
|
EC.presence_of_element_located((By.XPATH, search_input_xpath)) |
|
) |
|
element = driver.find_element(By.XPATH, search_input_xpath) |
|
print("Search input field located.") |
|
|
|
element.send_keys(patent) |
|
element.send_keys(Keys.RETURN) |
|
|
|
|
|
print("Waiting for search results to load...") |
|
WebDriverWait(driver, 20).until( |
|
EC.presence_of_element_located((By.TAG_NAME, "body")) |
|
) |
|
time.sleep(waiting_time) |
|
|
|
|
|
soup = BeautifulSoup(driver.page_source, "html.parser") |
|
pdf_link = self.get_pdf_link(soup, patent) |
|
except Exception as e: |
|
print(f"Error occurred: {e}") |
|
finally: |
|
driver.quit() |
|
|
|
|
|
if pdf_link: |
|
validate_directory(output_path) |
|
pdf_content = requests.get(pdf_link).content |
|
with open(os.path.join(output_path, f"{patent}.pdf"), "wb") as file: |
|
file.write(pdf_content) |
|
print(f">>> Patent {patent} successfully downloaded <<<") |
|
else: |
|
print(f"Error: PDF link for patent {patent} not found!") |
|
|
|
def get_pdfs(self, patents: Union[List[str], str], output_path: str = "./", |
|
waiting_time: int = 10, remove_kind_codes: Optional[List[str]] = None) -> None: |
|
""" |
|
Download multiple patent PDFs from a list or file. |
|
""" |
|
if isinstance(patents, str): |
|
if patents.lower().endswith('csv'): |
|
df_patents = pd.read_csv(patents) |
|
patents = df_patents['patent_number'].to_list() |
|
elif patents.lower().endswith('txt'): |
|
with open(patents, 'r') as txt_file: |
|
patents = txt_file.read().splitlines() |
|
else: |
|
raise NotImplementedError(f'Unsupported file type: {patents}') |
|
|
|
for i, patent in enumerate(patents): |
|
print(len(patents) - i, "patent(s) remaining.") |
|
self.get_pdf(patent, output_path, waiting_time, remove_kind_codes) |
|
|
|
@staticmethod |
|
def get_pdf_link(soup: BeautifulSoup, patent: str) -> Optional[str]: |
|
""" |
|
Extract the PDF link from parsed HTML. |
|
""" |
|
pdf_links = [link['href'] for link in soup.find_all('a', href=True) if link['href'].lower().endswith("pdf")] |
|
for link in pdf_links: |
|
if patent.lower() in link.lower(): |
|
return link |
|
return None |
|
|
|
|
|
def validate_directory(directory: str) -> None: |
|
""" |
|
Ensure the output directory exists. |
|
""" |
|
if not os.path.exists(directory): |
|
os.makedirs(directory) |
|
|