|
|
|
|
|
""" |
|
Created by Shengbo.Zhang on 2021/08/13 |
|
""" |
|
|
|
|
|
import io |
|
import re |
|
import os |
|
import csv |
|
import logging |
|
from docx import Document |
|
from pdf2docx import Converter |
|
from Pdf2Txt.config import * |
|
from pdfminer.layout import LAParams |
|
from pdfminer.pdfpage import PDFPage |
|
from pdfminer.converter import TextConverter |
|
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter |
|
from Pdf2Txt.config import _check_ann_title_processable |
|
|
|
|
|
|
|
logging.disable(logging.INFO) |
|
logging.disable(logging.WARNING) |
|
|
|
|
|
|
|
def _get_txt_from_pdf(pdf_path, out_path): |
|
''' |
|
读取Pdf文件,直接将其转换为Txt文本格式 |
|
:param pdf_path: 输入的pdf公告文件的完整路径 |
|
:param out_path: 输出的txt结果文件的完整路径 |
|
:return: bool |
|
''' |
|
manager = PDFResourceManager() |
|
output = io.StringIO() |
|
converter = TextConverter(manager, output, laparams=LAParams()) |
|
interpreter = PDFPageInterpreter(manager, converter) |
|
with open(pdf_path, 'rb') as infile: |
|
content = [] |
|
for page in PDFPage.get_pages(infile, check_extractable=True): |
|
interpreter.process_page(page) |
|
convertedPDF = output.getvalue() |
|
|
|
content.append(convertedPDF) |
|
|
|
|
|
for idx, val in enumerate(content): |
|
val = re.sub('\n+','\n', val) |
|
val = re.sub('\n +', '', val) |
|
val = val.replace('', '') |
|
content[idx] = val |
|
with open(out_path, 'wb') as f: |
|
f.write(''.join(content).encode('utf-8')) |
|
output.close() |
|
converter.close() |
|
f.close() |
|
return True |
|
|
|
|
|
|
|
def _get_cleaned_txt(txtPath, out_path): |
|
''' |
|
对Txt文件进行内容格式清洗(暂时仅供测试) |
|
:param txtPath: 输入的txt文件的完整路径 |
|
:param out_path: 输出的txt文件的完整路径 |
|
:return: bool |
|
''' |
|
with open(txtPath, 'rb')as f: |
|
content = f.read().decode('utf-8') |
|
p = re.compile(r'(?<=##)\S.+(?=##)|[\u4e00-\u9fff+\u3002\uFF0C]') |
|
x = ''.join(re.findall(p, content)) |
|
final_result = re.sub(u"[\uFF0C|\u3002|\u002B]{2,}", "", x) |
|
with open(out_path, "w")as txtPath: |
|
txtPath.write(final_result) |
|
|
|
return True |
|
|
|
|
|
|
|
def get_docx_from_pdf(pdf_path, out_path): |
|
''' |
|
读取Pdf文件,将其转换为Docx格式并存在本地 |
|
:param pdf_path: 输入的pdf公告文件的完整路径 |
|
:param out_path: 输出的中间docx结果文件的完整路径 |
|
:return: bool |
|
''' |
|
try: |
|
cv = Converter(pdf_path) |
|
cv.convert(out_path) |
|
except Exception: |
|
return False |
|
for p in cv.pages: |
|
if not p.finalized: |
|
cv.close() |
|
return False |
|
cv.close() |
|
return True |
|
|
|
|
|
|
|
def _find_key_indexs(str, key): |
|
''' |
|
给定一个父字符串和子串,在父串中查找子串的所有索引位置,并返回一个包含所有下标的列表 |
|
:param str: 父字符串 |
|
:param key: 子字符串 |
|
:return: list |
|
''' |
|
lstKey = [] |
|
countStr = str.count(key) |
|
if countStr < 1: |
|
return [] |
|
elif countStr == 1: |
|
indexKey = str.find(key) |
|
return [indexKey] |
|
else: |
|
indexKey = str.find(key) |
|
lstKey.append(indexKey) |
|
while countStr > 1: |
|
str_new = str[indexKey + 1:len(str) + 1] |
|
indexKey_new = str_new.find(key) |
|
indexKey = indexKey + 1 + indexKey_new |
|
lstKey.append(indexKey) |
|
countStr -= 1 |
|
lstKey.sort(reverse=True) |
|
return lstKey |
|
|
|
|
|
|
|
def _insert_char_into_str(str, idx, char): |
|
''' |
|
给定一个父字符串、下标位置、子串,在父串中的下标位置插入子串,并返回一个新的字符串 |
|
:param str: 父字符串 |
|
:param idx: 插入位置索引 |
|
:param char: 子字符串 |
|
:return: str |
|
''' |
|
tmp = list(str) |
|
tmp.insert(idx, char) |
|
return ''.join(tmp) |
|
|
|
|
|
|
|
def _is_chinese(str): |
|
''' |
|
给定一个字符串,判断该字符串是否全是中文 |
|
:param str: 字符串 |
|
:return: bool |
|
''' |
|
for ch in str: |
|
if '\u4e00' <= ch <= '\u9fff': |
|
return True |
|
return False |
|
|
|
|
|
|
|
def _get_table_row_feat(str): |
|
''' |
|
给定一个空格分割的表格行字符串,计算它的特征(01组成的字符串) |
|
:param str: 字符串 |
|
:return: 字符串 |
|
''' |
|
s = str.split() |
|
r = '' |
|
for c in s: |
|
try: |
|
_ = float(c) |
|
r += '1' |
|
except Exception: |
|
r += '0' |
|
return r |
|
|
|
|
|
|
|
def _check_if_include_first_proper(s, corpus): |
|
''' |
|
检查字符串s中是否包含语料列表first_corpus中的某一内容 |
|
:param s: 字符串 |
|
:param corpus: 字符串列表 |
|
:return: [bool, str] |
|
''' |
|
for i in corpus: |
|
if i in s: |
|
return [True, i] |
|
return [False, ''] |
|
|
|
|
|
|
|
def _check_if_include_second_proper(s, corpus): |
|
''' |
|
检查字符串s中是否包含语料列表first_corpus中的某一内容 |
|
:param s: 字符串 |
|
:param corpus: 字符串列表 |
|
:return: list |
|
''' |
|
res = [] |
|
for i in corpus: |
|
if i in s: |
|
res.append([True, i]) |
|
else: |
|
res.append([False, i]) |
|
return res |
|
|
|
|
|
|
|
def _match_and_insert(string, pattern, substring): |
|
''' |
|
匹配string字符串中的pattern,计算所有pattern在string中的首个字符索引位置,并在string从后向前插入substring至这些位置 |
|
:param string: 待匹配的字符串 |
|
:param pattern: 匹配模式 |
|
:param substring: 待插入的子字符串 |
|
:return: 插入后的字符串 |
|
''' |
|
idx_list = [] |
|
for j in re.finditer(pattern, string): |
|
idx_list.append(j.span()[0]) |
|
|
|
idx_list.sort(reverse=True) |
|
if idx_list != []: |
|
for k in idx_list: |
|
if k > 0 and string[k-1] != '“': |
|
string = _insert_char_into_str(string, k, substring) |
|
return string |
|
|
|
|
|
|
|
def _match_and_delete(string, pattern): |
|
''' |
|
匹配string字符串中的pattern,计算pattern在string中的首个字符索引位置,删除该索引前2个位置的换行符\n |
|
:param string: 待匹配的字符串 |
|
:param pattern: 匹配模式 |
|
:return: 删除'\n\n'子字符串后的字符串 |
|
''' |
|
matcher = re.search(pattern, string) |
|
if matcher: |
|
k = matcher.span()[0] |
|
if k >= 2 and string[k-1] == '\n' and string[k-2] == '\n': |
|
string = string[:k-2] + string[k:] |
|
return string |
|
|
|
|
|
|
|
def get_txt_from_docx(doc): |
|
''' |
|
读取Docx文件中每个自然行的材料内容 |
|
:param doc: 一个Document对象实例 |
|
:param out_path: 输出的txt结果文件的完整路径 |
|
:return: bool(转换是否成功), list(格式化修正后的文本列表) |
|
''' |
|
|
|
NUMBER_1 = '123456789一二三四五六七八九十' |
|
|
|
NUMBER_2 = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' |
|
|
|
paras = [para.text+'\n' for i, para in enumerate(doc.paragraphs)] |
|
|
|
new_paras = [] |
|
|
|
new_paras_len_cnt = [] |
|
|
|
try: |
|
|
|
for val in paras: |
|
|
|
if val == '\n' or re.search('^[0-9]+ \n$', val) or val[:2] == '单位': |
|
continue |
|
|
|
new_paras.append(val.lstrip()) |
|
|
|
new_paras_len_cnt.append(len(val)) |
|
|
|
|
|
line_mark = 0 |
|
|
|
for i, val in enumerate(new_paras[:10]): |
|
|
|
if '\t' in val or val.count(' ') > 2: |
|
new_paras[i] = ' '.join(val.split()) + '\n' |
|
if '证券代码:' in new_paras[i]: |
|
continue |
|
|
|
if val.replace(' ', '')[-5:] == '有限公司\n': |
|
new_paras[i] = val.replace(' ', '') |
|
continue |
|
|
|
|
|
if _check_ann_title_processable(val.replace(' ', ''), exp=1): |
|
new_paras[i] = val.replace(' ', '') |
|
line_mark = i + 1 |
|
break |
|
else: |
|
new_paras[i] = val.replace('\n', '').replace(' ', '') |
|
|
|
|
|
mean_len = sum(new_paras_len_cnt)//len(new_paras_len_cnt) |
|
|
|
|
|
for i, _ in enumerate(new_paras): |
|
|
|
if i >= line_mark: |
|
|
|
new_paras[i] = new_paras[i]\ |
|
.replace(' ', '')\ |
|
.replace(' ', '')\ |
|
.replace('', '')\ |
|
.replace(',', '') |
|
|
|
|
|
if i < len(new_paras)-1 and \ |
|
len(new_paras[i]) >= mean_len and \ |
|
((new_paras[i + 1].replace('(','').replace('(','')[0] not in NUMBER_1) or |
|
new_paras[i + 1][-1] == '\n'): |
|
new_paras[i] = new_paras[i].replace('\n', '') |
|
|
|
|
|
if i < len(new_paras)-2 and \ |
|
len(new_paras[i + 1]) >= 3 and \ |
|
new_paras[i + 1].replace('(','').replace('(','')[0] in NUMBER_2 and \ |
|
(not '.' in new_paras[i+1][:3]) and \ |
|
(not '、' in new_paras[i+1][:3]) and \ |
|
(not '年' in new_paras[i+1]): |
|
new_paras[i] = new_paras[i].replace('\n', '') |
|
|
|
|
|
for j in _find_key_indexs(new_paras[i], ':'): |
|
|
|
|
|
|
|
if j < len(new_paras[i])-1 and new_paras[i][j+1] != '\n' and \ |
|
('(' not in new_paras[i]) and ('《' not in new_paras[i]) and \ |
|
(')' not in new_paras[i]) and ('》' not in new_paras[i]) and \ |
|
(not _check_if_include_first_proper(new_paras[i], FIRST_PROPER_CORPUS)[0]): |
|
new_paras[i] = _insert_char_into_str(new_paras[i], j+1, '\n') |
|
|
|
for j in _find_key_indexs(new_paras[i], '('): |
|
|
|
|
|
if new_paras[i][j+1] in NUMBER_1 and new_paras[i-1][-1] != '\n' and \ |
|
(not _is_chinese(new_paras[i][j-1])) and new_paras[i][j-1] != '》': |
|
new_paras[i] = _insert_char_into_str(new_paras[i], j, '\n') |
|
|
|
for j in _find_key_indexs(new_paras[i], '('): |
|
|
|
|
|
if new_paras[i][j + 1] in NUMBER_1 and new_paras[i - 1][-1] != '\n' and \ |
|
(not _is_chinese(new_paras[i][j - 1])) and new_paras[i][j - 1] != '》': |
|
new_paras[i] = _insert_char_into_str(new_paras[i], j, '\n') |
|
|
|
for j in _find_key_indexs(new_paras[i], '、'): |
|
|
|
|
|
if (j-2) < len(new_paras[i]) and new_paras[i][j-1] in NUMBER_1 and new_paras[i][j-2] not in NUMBER_1 \ |
|
and new_paras[i][j-2] in '。;.;' and new_paras[i-1][-1] != '\n': |
|
new_paras[i] = _insert_char_into_str(new_paras[i], j-1, '\n') |
|
continue |
|
|
|
|
|
if (j-3) < len(new_paras[i]) and new_paras[i][j-1] in NUMBER_1 and new_paras[i][j-2] in NUMBER_1 \ |
|
and new_paras[i][j-3] in '。;.;' and new_paras[i-1][-1] != '\n': |
|
new_paras[i] = _insert_char_into_str(new_paras[i], j-2, '\n') |
|
|
|
|
|
if new_paras[i] == '特此公告。\n': |
|
if new_paras[i-1][-1] != '\n': |
|
new_paras[i] = '\n特此公告。\n' |
|
if new_paras[i+1][-1] != '\n': |
|
new_paras[i+1] += '\n' |
|
|
|
|
|
if (i+1) < len(new_paras): |
|
tmp_flag, tmp_str = _check_if_include_first_proper(new_paras[i+1], FIRST_PROPER_CORPUS) |
|
if tmp_flag: |
|
tmp_idx = new_paras[i+1].index(tmp_str) - 1 |
|
if tmp_idx >= 0 and new_paras[i+1][tmp_idx] != '(': |
|
if new_paras[i][-1] != '\n': |
|
new_paras[i] += '\n' |
|
|
|
|
|
str_sum = ''.join(new_paras) |
|
|
|
final_paras = str_sum.split('\n') |
|
|
|
for i, val in enumerate(final_paras): |
|
|
|
end_flag = '\n\n' |
|
|
|
final_paras[i] += end_flag |
|
|
|
|
|
|
|
if '(' in final_paras[i]: |
|
final_paras[i] = _match_and_insert(final_paras[i], '[\(\(]+[0-9]{1,2}[\)\)]+', end_flag) |
|
|
|
|
|
|
|
if len(_find_key_indexs(final_paras[i], '(')) != len(_find_key_indexs(final_paras[i], ')')): |
|
final_paras[i] = final_paras[i][:-2] |
|
|
|
|
|
str_sum = ''.join(final_paras) |
|
|
|
final_paras = str_sum.split('\n\n') |
|
|
|
for i, val in enumerate(final_paras): |
|
|
|
end_flag = '\n\n' |
|
|
|
final_paras[i] += end_flag |
|
|
|
|
|
if '重要内容提示:' in final_paras[i]: |
|
idx = final_paras[i].index('重要内容提示:') |
|
if final_paras[i][idx+7] != '\n': |
|
final_paras[i] = _insert_char_into_str(final_paras[i], idx+7, '\n\n') |
|
if idx > 0: |
|
if final_paras[i][idx-1] != '\n': |
|
final_paras[i] = _insert_char_into_str(final_paras[i], idx, '\n\n') |
|
|
|
|
|
if '表决结果:' in final_paras[i]: |
|
if final_paras[i][:5] == '表决结果:': |
|
final_paras[i] = final_paras[i][:-2] |
|
elif final_paras[i][-7:] == '表决结果:\n\n': |
|
idx = final_paras[i].find('表决结果:') |
|
final_paras[i] = _insert_char_into_str(final_paras[i], idx, '\n\n') |
|
final_paras[i] = final_paras[i][:-2] |
|
else: |
|
idx = final_paras[i].find('表决结果:') |
|
final_paras[i] = _insert_char_into_str(final_paras[i], idx, '\n\n') |
|
|
|
|
|
for is_include, s_include in _check_if_include_second_proper(final_paras[i], SECOND_PROPER_CORPUS): |
|
if is_include: |
|
|
|
if final_paras[i][final_paras[i].index(s_include)+len(s_include)] == '\n': |
|
final_paras[i] = final_paras[i].replace('\n', '') |
|
|
|
|
|
|
|
if '(' in final_paras[i]: |
|
final_paras[i] = _match_and_insert(final_paras[i], '[\(\(]+[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}[\)\)]+', end_flag) |
|
final_paras[i] = _match_and_insert(final_paras[i], '[\(\(]+[0-9]{1,2}[\)\)]+', end_flag) |
|
|
|
|
|
|
|
if '、' in final_paras[i]: |
|
final_paras[i] = _match_and_insert(final_paras[i], '[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}、', end_flag) |
|
final_paras[i] = _match_and_insert(final_paras[i], '[0-9]{1,2}、', end_flag) |
|
|
|
final_paras[i] = _match_and_delete(final_paras[i], '[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}、[\S]+、[\S]+') |
|
final_paras[i] = _match_and_delete(final_paras[i], '[0-9]+、[0-9]+') |
|
|
|
|
|
for j in _find_key_indexs(final_paras[i], '●'): |
|
if j > 0: |
|
final_paras[i] = _insert_char_into_str(final_paras[i], j, end_flag) |
|
|
|
|
|
except Exception: |
|
return False, [] |
|
|
|
return True, final_paras |
|
|
|
|
|
|
|
def get_table_from_docx(doc, txt, out_path="", is_out_flag=False): |
|
''' |
|
读取Docx文件中每个表格的材料内容 |
|
:param doc: 一个Document对象实例 |
|
:param txt: 一个字符串列表,包含PDF的正文文本内容 |
|
:param out_path: 输出的csv结果文件的完整路径 |
|
:param is_out_flag: 是否输出csv结果文件,默认不输出 |
|
:return: list, list |
|
''' |
|
data = [] |
|
table_txt = [] |
|
attach_txt = {} |
|
for table in doc.tables[:]: |
|
table_txt.append('-----表格-----\n') |
|
for i, row in enumerate(table.rows[:]): |
|
row_content = [] |
|
for cell in row.cells[:]: |
|
c = cell.text |
|
new_c = c.replace('\n', '').replace(' ','').replace('\t','').replace(',','') |
|
row_content.append(new_c) |
|
if row_content == []: |
|
continue |
|
if '本公司' in row_content[0]: |
|
tmp = '' |
|
for line in row_content: |
|
tmp += line.strip() |
|
tmp += '\n\n' |
|
attach_txt['000'] = tmp |
|
continue |
|
if '证券代码' in row_content[0]: |
|
tmp = '^' |
|
for line in row_content: |
|
tmp += line.strip()+' ' |
|
tmp += '$\n' |
|
txt.insert(tmp, 0) |
|
continue |
|
data.append(row_content) |
|
new_row = '^' + '\t'.join(row_content) + '$\n' |
|
if new_row.replace('\t','') != '^$\n': |
|
table_txt.append(new_row) |
|
data.append('-----表格-----\n') |
|
table_txt.append('-----表格-----\n') |
|
|
|
flag = False |
|
for i, val in enumerate(table_txt): |
|
if val == '-----表格-----\n': |
|
if not flag: |
|
flag = True |
|
else: |
|
table_txt[i] = '^$\n' |
|
else: |
|
flag = False |
|
|
|
table_txt = list(filter(lambda x: x != '^$\n', table_txt)) |
|
for i, val in enumerate(table_txt): |
|
if val == '-----表格-----\n' and (i > 0) and (i < len(table_txt)-1): |
|
feat1 = _get_table_row_feat(table_txt[i-1].replace('\n', '')) |
|
feat2 = _get_table_row_feat(table_txt[i+1].replace('\n', '')) |
|
if feat1 == feat2: |
|
table_txt[i] = '^$\n' |
|
|
|
if len(table_txt) == 1 and table_txt[0] == '-----表格-----\n': |
|
table_txt[0] = '^$\n' |
|
|
|
for i, val in enumerate(table_txt): |
|
if val == '-----表格-----': |
|
continue |
|
if val == '^$\n': |
|
table_txt[i] = '' |
|
continue |
|
table_txt[i] = val[1:][:-2] + '\n' |
|
|
|
txt.extend(table_txt) |
|
|
|
if is_out_flag: |
|
f = open(out_path, 'w+', newline='') |
|
writer = csv.writer(f) |
|
for i, val in enumerate(data): |
|
if i == 0 and val == '\n': |
|
continue |
|
writer.writerow(val) |
|
f.close() |
|
|
|
return txt, attach_txt |
|
|
|
|
|
|
|
def refine_pdf2txt_list_result(txt, att_txt): |
|
''' |
|
对txt字符串列表进行最后的校对,还原或附加误识别为表格的正文内容 |
|
:param txt: 一个字符串列表,包含PDF的正文文本内容 |
|
:param att_txt: 一些误识别为表格的正文内容 |
|
:return: list |
|
''' |
|
for id, val in enumerate(txt): |
|
if id > 10: break |
|
else: |
|
if val[-6:-2] == '有限公司': |
|
txt[id] = val[:-2] |
|
continue |
|
if '000' in att_txt and _check_ann_title_processable(val, exp=2): |
|
txt.insert(id+1, att_txt['000']) |
|
break |
|
return txt |
|
|
|
|
|
|
|
def write_pdf2txt_list_result(out_path, txt, out_mode_flag=True): |
|
''' |
|
将txt字符串列表写为txt文本文件 |
|
:param out_path: 生成的txt文本文件的路径 |
|
:param txt: 一个字符串列表,包含PDF的正文和表格 |
|
:param out_mode_flag: 是否添加段头标识'^'和段尾标识'$' |
|
:return: bool |
|
''' |
|
with open(out_path, "w", encoding='utf-8') as f: |
|
if not out_mode_flag: |
|
for line in txt: |
|
if line != '^$\n': |
|
f.write(line) |
|
else: |
|
strs = ''.join(txt) |
|
paras = strs.split('\n') |
|
for line in paras: |
|
if line != '': |
|
f.write('^' + line + '$\n') |
|
return True |
|
|
|
|
|
|
|
def get_pdf2txt_str_result(txt, out_mode_flag=True): |
|
''' |
|
将txt字符串列表内元素拼接为完整的txt内容 |
|
:param txt: 一个字符串列表,包含PDF的正文和表格 |
|
:param out_mode_flag: 是否添加段头标识'^'和段尾标识'$' |
|
:return: str |
|
''' |
|
txt_str = "" |
|
for line in txt: |
|
if not out_mode_flag: |
|
for line in txt: |
|
if line != '^$\n': |
|
txt_str += line |
|
else: |
|
strs = ''.join(txt) |
|
paras = strs.split('\n') |
|
for line in paras: |
|
if line != '': |
|
txt_str += ('^' + line + '$\n') |
|
return txt_str |
|
|
|
|
|
def find_all_local_file(base, extension): |
|
''' |
|
找出给定目录下所有的指定后缀格式的文件路径 |
|
:param base: 目录路径 |
|
:param extension: 后缀格式,例如: '.pdf' |
|
:return: str |
|
''' |
|
for root, ds, fs in os.walk(base): |
|
for f in fs: |
|
if f.endswith(extension.lower()) or f.endswith(extension.upper()): |
|
fullname = os.path.join(root, f).replace('/', '//').replace('\\', '//') |
|
yield fullname |