DrishtiSharma commited on
Commit
630b3cc
·
verified ·
1 Parent(s): 5c4cff8

Create test.py

Browse files
Files changed (1) hide show
  1. test.py +170 -0
test.py ADDED
@@ -0,0 +1,170 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Union, Optional
2
+ import os
3
+ import requests
4
+ import re
5
+ import time
6
+ import shutil
7
+ import subprocess
8
+ import pandas as pd
9
+ from selenium import webdriver
10
+ from selenium.webdriver.common.keys import Keys
11
+ from selenium.webdriver.chrome.service import Service
12
+ from selenium.webdriver.chrome.options import Options
13
+ from bs4 import BeautifulSoup
14
+ from selenium.webdriver.common.by import By
15
+ from selenium.webdriver.support.ui import WebDriverWait
16
+ from selenium.webdriver.support import expected_conditions as EC
17
+ import chromedriver_autoinstaller
18
+
19
+
20
+ class PatentDownloader:
21
+ url = "https://patents.google.com"
22
+
23
+ def __init__(self, verbose: bool = False):
24
+ """
25
+ Parameters
26
+ ----------
27
+ verbose : bool
28
+ Print additional debug information.
29
+ """
30
+ self.verbose = verbose
31
+ self.chrome_path = self.install_chrome()
32
+
33
+ def install_chrome(self) -> str:
34
+ """
35
+ Download and install Google Chrome dynamically.
36
+ Returns
37
+ -------
38
+ str: Path to the Chrome binary.
39
+ """
40
+ chrome_path = "/usr/bin/google-chrome"
41
+
42
+ if not shutil.which("google-chrome"):
43
+ print("Downloading and installing Google Chrome...")
44
+ subprocess.run(
45
+ "wget https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb -O chrome.deb",
46
+ shell=True,
47
+ check=True,
48
+ )
49
+ subprocess.run(
50
+ "apt-get update && apt-get install -y ./chrome.deb",
51
+ shell=True,
52
+ check=True,
53
+ )
54
+ os.remove("chrome.deb")
55
+
56
+ if not shutil.which("google-chrome"):
57
+ raise ValueError("Google Chrome installation failed!")
58
+ return chrome_path
59
+
60
+ def download(self, patent: Union[str, List[str]], output_path: str = "./",
61
+ waiting_time: int = 10, remove_kind_codes: Optional[List[str]] = None) -> None:
62
+ """
63
+ Download patent document(s) as PDF.
64
+ """
65
+ if isinstance(patent, list) or os.path.isfile(patent):
66
+ self.get_pdfs(patent, output_path, waiting_time, remove_kind_codes)
67
+ else:
68
+ self.get_pdf(patent, output_path, waiting_time, remove_kind_codes)
69
+
70
+ def get_pdf(self, patent: str, output_path: str = "./", waiting_time: int = 10,
71
+ remove_kind_codes: Optional[List[str]] = None) -> None:
72
+ """
73
+ Download a single patent PDF.
74
+ """
75
+ if remove_kind_codes:
76
+ for kind_code in remove_kind_codes:
77
+ patent = re.sub(kind_code + "$", "", patent)
78
+
79
+ # Automatically install ChromeDriver
80
+ chromedriver_autoinstaller.install()
81
+
82
+ # Set up Chrome options
83
+ chrome_options = Options()
84
+ chrome_options.binary_location = self.chrome_path
85
+ chrome_options.add_argument("--headless")
86
+ chrome_options.add_argument("--no-sandbox")
87
+ chrome_options.add_argument("--disable-dev-shm-usage")
88
+
89
+ # Initialize Selenium WebDriver
90
+ service = Service()
91
+ driver = webdriver.Chrome(service=service, options=chrome_options)
92
+ pdf_link = None # Ensure pdf_link is defined
93
+
94
+ try:
95
+ driver.get(self.url)
96
+
97
+ # Wait for the search input field and interact with it
98
+ print("Waiting for the search input field...")
99
+ search_input_xpath = "//input[@aria-label='Search patents']"
100
+ WebDriverWait(driver, 20).until(
101
+ EC.presence_of_element_located((By.XPATH, search_input_xpath))
102
+ )
103
+ element = driver.find_element(By.XPATH, search_input_xpath)
104
+ print("Search input field located.")
105
+
106
+ element.send_keys(patent)
107
+ element.send_keys(Keys.RETURN)
108
+
109
+ # Wait for search results to load
110
+ print("Waiting for search results to load...")
111
+ WebDriverWait(driver, 20).until(
112
+ EC.presence_of_element_located((By.TAG_NAME, "body"))
113
+ )
114
+ time.sleep(waiting_time)
115
+
116
+ # Parse HTML and get the PDF link
117
+ soup = BeautifulSoup(driver.page_source, "html.parser")
118
+ pdf_link = self.get_pdf_link(soup, patent)
119
+ except Exception as e:
120
+ print(f"Error occurred: {e}")
121
+ finally:
122
+ driver.quit()
123
+
124
+ # Download the PDF
125
+ if pdf_link:
126
+ validate_directory(output_path)
127
+ pdf_content = requests.get(pdf_link).content
128
+ with open(os.path.join(output_path, f"{patent}.pdf"), "wb") as file:
129
+ file.write(pdf_content)
130
+ print(f">>> Patent {patent} successfully downloaded <<<")
131
+ else:
132
+ print(f"Error: PDF link for patent {patent} not found!")
133
+
134
+ def get_pdfs(self, patents: Union[List[str], str], output_path: str = "./",
135
+ waiting_time: int = 10, remove_kind_codes: Optional[List[str]] = None) -> None:
136
+ """
137
+ Download multiple patent PDFs from a list or file.
138
+ """
139
+ if isinstance(patents, str):
140
+ if patents.lower().endswith('csv'):
141
+ df_patents = pd.read_csv(patents)
142
+ patents = df_patents['patent_number'].to_list()
143
+ elif patents.lower().endswith('txt'):
144
+ with open(patents, 'r') as txt_file:
145
+ patents = txt_file.read().splitlines()
146
+ else:
147
+ raise NotImplementedError(f'Unsupported file type: {patents}')
148
+
149
+ for i, patent in enumerate(patents):
150
+ print(len(patents) - i, "patent(s) remaining.")
151
+ self.get_pdf(patent, output_path, waiting_time, remove_kind_codes)
152
+
153
+ @staticmethod
154
+ def get_pdf_link(soup: BeautifulSoup, patent: str) -> Optional[str]:
155
+ """
156
+ Extract the PDF link from parsed HTML.
157
+ """
158
+ pdf_links = [link['href'] for link in soup.find_all('a', href=True) if link['href'].lower().endswith("pdf")]
159
+ for link in pdf_links:
160
+ if patent.lower() in link.lower():
161
+ return link
162
+ return None
163
+
164
+
165
+ def validate_directory(directory: str) -> None:
166
+ """
167
+ Ensure the output directory exists.
168
+ """
169
+ if not os.path.exists(directory):
170
+ os.makedirs(directory)