Spaces:
Runtime error
Runtime error
import os | |
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
from pathlib import Path | |
import time | |
import hashlib | |
from datetime import datetime | |
import torch | |
from PIL import Image | |
import faiss | |
# 必要なライブラリをインポート | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.vectorstores import Chroma | |
from langchain.chains import RetrievalQA | |
from langchain.prompts import PromptTemplate | |
from langchain.llms import HuggingFacePipeline | |
from langchain.schema import Document | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
from transformers import T5ForConditionalGeneration, T5Tokenizer | |
from sentence_transformers import SentenceTransformer | |
model = SentenceTransformer("all-MiniLM-L6-v2") # 例 | |
print(model.get_sentence_embedding_dimension()) # 768 のはず | |
# モデルをロード | |
model = SentenceTransformer("all-MiniLM-L6-v2") # 768次元の埋め込みを生成 | |
# 既存のインデックスファイルを削除する(手動で削除するか、スクリプトで削除する) | |
if os.path.exists("faiss_index"): | |
os.remove("faiss_index") | |
# FAISSインデックスを作り直す | |
embedding_dim = 768 # ここをモデルに合わせる | |
index = faiss.IndexFlatL2(embedding_dim) # L2距離で検索 | |
# 新しいインデックスを保存 | |
faiss.write_index(index, "faiss_index") | |
#model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2") # 1024次元のモデル | |
# 条件付きインポート(ローカル環境とHugging Face Spacesの両方に対応) | |
try: | |
import fitz # PyMuPDF | |
PYMUPDF_AVAILABLE = True | |
except ImportError: | |
PYMUPDF_AVAILABLE = False | |
print("PyMuPDFが利用できません。PDFファイルはテキスト抽出のみで処理されます。") | |
try: | |
import easyocr | |
EASYOCR_AVAILABLE = True | |
except ImportError: | |
EASYOCR_AVAILABLE = False | |
print("EasyOCRが利用できません。OCR機能は無効化されます。") | |
try: | |
import cv2 | |
CV2_AVAILABLE = True | |
except ImportError: | |
CV2_AVAILABLE = False | |
print("OpenCVが利用できません。画像処理機能は制限されます。") | |
class ManualChatbot: | |
def __init__(self, docs_dir="./manuals"): | |
"""手順書チャットボットの初期化""" | |
self.docs_dir = docs_dir | |
self.vectorstore = None # ベクトルデータベースの初期化 | |
self.file_hashes = {} # ファイルのハッシュ値を保持する辞書 | |
self.last_update_check = None # 最後に更新をチェックした時間 | |
self.processing_status = "未初期化" | |
# ディレクトリが存在しなければ作成 | |
os.makedirs(docs_dir, exist_ok=True) | |
os.makedirs("./chroma_db", exist_ok=True) | |
# ファイルハッシュの記録ファイルパス | |
self.hash_file_path = os.path.join(os.path.dirname(docs_dir), "file_hashes.json") | |
# OCRの初期化(可能な場合) | |
if EASYOCR_AVAILABLE: | |
self.reader = easyocr.Reader(['ja', 'en']) # 日本語と英語に対応 | |
print("EasyOCRを初期化しました") | |
else: | |
self.reader = None | |
# 要約用の T5 モデル準備(モデルサイズを小さくしてHF Spacesでの動作に最適化) | |
self.summarizer_model = None | |
self.summarizer_tokenizer = None | |
# ハッシュ読み込み | |
self._load_file_hashes() | |
def _load_file_hashes(self): | |
"""保存されたファイルハッシュを読み込む""" | |
if os.path.exists(self.hash_file_path): | |
try: | |
import json | |
with open(self.hash_file_path, 'r') as f: | |
self.file_hashes = json.load(f) | |
print(f"{len(self.file_hashes)}件のファイルハッシュを読み込みました") | |
except Exception as e: | |
print(f"ファイルハッシュの読み込みに失敗しました: {str(e)}") | |
self.file_hashes = {} | |
else: | |
self.file_hashes = {} | |
def _save_file_hashes(self): | |
"""ファイルハッシュを保存する""" | |
try: | |
import json | |
with open(self.hash_file_path, 'w') as f: | |
json.dump(self.file_hashes, f) | |
print(f"{len(self.file_hashes)}件のファイルハッシュを保存しました") | |
except Exception as e: | |
print(f"ファイルハッシュの保存に失敗しました: {str(e)}") | |
def _get_file_hash(self, file_path): | |
"""ファイルのMD5ハッシュを計算する""" | |
hash_md5 = hashlib.md5() | |
with open(file_path, "rb") as f: | |
for chunk in iter(lambda: f.read(4096), b""): | |
hash_md5.update(chunk) | |
return hash_md5.hexdigest() | |
def process_uploaded_files(self, files): | |
""" | |
Gradioからアップロードされたファイルを処理する | |
:param files: アップロードされたファイルのリスト | |
:return: 処理状況を示すメッセージ | |
""" | |
if not files: | |
return "ファイルがアップロードされていません" | |
self.processing_status = "処理中..." | |
# 新しく追加されたファイルを一時的に保存し処理する | |
file_paths = [] | |
for file in files: | |
if file is None: | |
continue | |
# ファイル拡張子を確認 | |
filename = getattr(file, "orig_name", os.path.basename(file.name)) #file.name | |
file_ext = os.path.splitext(filename)[1].lower() | |
if file_ext not in ['.pdf', '.xlsx', '.xls', '.png', '.jpg', '.jpeg']: | |
continue | |
# ファイルを保存する | |
save_path = os.path.join(self.docs_dir, os.path.basename(filename)) | |
with open(save_path, 'wb') as f: | |
f.write(file.read()) | |
file_paths.append(save_path) | |
if not file_paths: | |
self.processing_status = "サポートされているファイルがありませんでした" | |
return "サポートされているファイルがありませんでした(.pdf, .xlsx, .xls, .png, .jpg, .jpeg)" | |
# ファイルを処理して知識ベースを更新 | |
self.update_knowledge_base(file_paths) | |
self.processing_status = "準備完了" | |
return f"{len(file_paths)}個のファイルが処理され、知識ベースに追加されました" | |
def update_knowledge_base(self, file_paths): | |
""" | |
指定したファイルから新しいデータを読み込み、インデックスを更新する | |
:param file_paths: 更新したファイルのパス一覧(リスト) | |
""" | |
print(f"{len(file_paths)}件のファイルを処理します...") | |
new_documents = [] | |
for file_path in file_paths: | |
if file_path.lower().endswith(".pdf"): | |
new_documents.extend(self._process_pdf(file_path)) | |
elif file_path.lower().endswith((".xlsx", ".xls")): | |
new_documents.extend(self._process_excel(file_path)) | |
elif file_path.lower().endswith((".png", ".jpg", ".jpeg")): | |
new_documents.extend(self._process_image(file_path)) | |
if not new_documents: | |
print("処理対象のドキュメントがありませんでした") | |
return | |
print(f"{len(new_documents)}件のドキュメントを処理しました") | |
# テキスト分割 | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=200, | |
separators=["\n\n", "\n", "。", "、", " ", ""] | |
) | |
chunks = text_splitter.split_documents(new_documents) | |
print(f"{len(chunks)}個のテキストチャンクに分割しました") | |
# 埋め込みモデルの初期化 | |
embeddings = HuggingFaceEmbeddings( | |
model_name="intfloat/multilingual-e5-base", # 軽量化のためbaseモデルを使用 | |
model_kwargs={'device': 'cpu'} # Spacesでは常にCPUを使用 | |
) | |
# 既存のベクトルストアが存在する場合は追加、なければ新規作成 | |
if self.vectorstore is None: | |
self.vectorstore = Chroma.from_documents( | |
documents=chunks, | |
embedding=embeddings, | |
persist_directory="./chroma_db" | |
) | |
self.vectorstore.persist() | |
else: | |
# 既存のベクトルストアに新しいドキュメントを追加 | |
self.vectorstore.add_documents(chunks) | |
# ベクトルストアを保存 | |
self.vectorstore.persist() | |
# もしQAチェーンがなければ初期化 | |
if not hasattr(self, 'qa_chain') or self.qa_chain is None: | |
self._initialize_qa_chain() | |
else: | |
# QAチェーンを更新された検索エンジンで更新 | |
self.qa_chain.retriever = self.vectorstore.as_retriever(search_kwargs={"k": 3}) | |
print("知識ベースを更新しました!") | |
def _process_pdf(self, file_path): | |
"""PDFファイルを処理してドキュメントを返す""" | |
try: | |
# PyMuPDFが利用可能な場合 | |
if PYMUPDF_AVAILABLE: | |
doc = fitz.open(file_path) | |
all_text = "" | |
for page_num, page in enumerate(doc): | |
text = page.get_text() | |
all_text += f"--- Page {page_num + 1} ---\n{text}\n\n" | |
# OCRが必要か確認(テキストが少ない場合) | |
if len(all_text.strip()) < 100 and EASYOCR_AVAILABLE and self.reader: | |
all_text = self.extract_text_from_pdf_with_ocr(file_path) | |
return [Document(page_content=all_text, metadata={"source": file_path})] | |
else: | |
# 簡易処理(PyMuPDFが利用できない場合) | |
# 注意: この場合はPDFの内容を適切に抽出できない可能性がある | |
return [Document(page_content=f"PDF file: {os.path.basename(file_path)}", | |
metadata={"source": file_path})] | |
except Exception as e: | |
print(f"PDFファイルの処理中にエラーが発生しました ({file_path}): {str(e)}") | |
return [] | |
def _process_excel(self, file_path): | |
"""Excelファイルを処理してドキュメントを返す""" | |
try: | |
# Pandas でExcelを読み込む | |
dfs = pd.read_excel(file_path, sheet_name=None, engine="openpyxl" if file_path.endswith(".xlsx") else "xlrd") | |
documents = [] | |
for sheet_name, df in dfs.items(): | |
# NaN値を空文字列に変換 | |
df = df.fillna('') | |
# 各行をテキストに変換 | |
for idx, row in df.iterrows(): | |
content = f"Sheet: {sheet_name}, Row: {idx}\n" | |
for col in df.columns: | |
content += f"{col}: {row[col]}\n" | |
doc = Document( | |
page_content=content, | |
metadata={"source": file_path, "sheet": sheet_name, "row": idx} | |
) | |
documents.append(doc) | |
return documents | |
except Exception as e: | |
print(f"Excelファイルの処理中にエラーが発生しました ({file_path}): {str(e)}") | |
return [] | |
def _process_image(self, file_path): | |
"""画像ファイルを処理してドキュメントを返す""" | |
try: | |
if not EASYOCR_AVAILABLE or not self.reader: | |
return [Document( | |
page_content=f"画像ファイル: {os.path.basename(file_path)} (OCR未対応)", | |
metadata={"source": file_path} | |
)] | |
img = Image.open(file_path) | |
# EasyOCRで画像からテキストを抽出 | |
result = self.reader.readtext(np.array(img)) | |
# 抽出されたテキストを結合 | |
text = "\n".join([detection[1] for detection in result]) | |
if not text.strip(): | |
text = f"画像ファイル: {os.path.basename(file_path)} (テキスト検出なし)" | |
# ドキュメントとしてリストに追加 | |
return [Document(page_content=text, metadata={"source": file_path})] | |
except Exception as e: | |
print(f"画像ファイルの処理中にエラーが発生しました ({file_path}): {str(e)}") | |
return [] | |
def _initialize_qa_chain(self): | |
"""QAチェーンを初期化する""" | |
try: | |
# LLMの初期化(小さいモデルを使用) | |
model_name = "cyberagent/open-calm-small" # 日本語対応の小さいモデル | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForCausalLM.from_pretrained(model_name) | |
pipe = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
max_new_tokens=300, | |
temperature=0.7, | |
do_sample=True, | |
device="cpu" # Spaces環境ではCPU使用 | |
) | |
local_llm = HuggingFacePipeline(pipeline=pipe) | |
# プロンプトテンプレート | |
template = """ | |
次の手順書データを使って質問に答えてください。 | |
### 手順書データ: | |
{context} | |
### 質問: | |
{question} | |
### 回答: | |
""" | |
prompt = PromptTemplate( | |
template=template, | |
input_variables=["context", "question"] | |
) | |
# QAチェーンの作成 | |
self.qa_chain = RetrievalQA.from_chain_type( | |
llm=local_llm, | |
chain_type="stuff", | |
retriever=self.vectorstore.as_retriever(search_kwargs={"k": 3}), | |
chain_type_kwargs={"prompt": prompt}, | |
return_source_documents=True | |
) | |
print("QAチェーンを初期化しました") | |
except Exception as e: | |
print(f"QAチェーンの初期化中にエラーが発生しました: {str(e)}") | |
self.qa_chain = None | |
def extract_text_from_pdf_with_ocr(self, pdf_path): | |
"""PDFファイルからテキストを抽出し、必要に応じてOCRを適用する""" | |
if not PYMUPDF_AVAILABLE or not EASYOCR_AVAILABLE or not self.reader: | |
return f"PDF: {os.path.basename(pdf_path)} (OCR未対応)" | |
doc = fitz.open(pdf_path) | |
full_text = "" | |
for page_num, page in enumerate(doc): | |
# テキストの抽出を試みる | |
text = page.get_text() | |
# テキストが少ない場合はOCRを適用する | |
if len(text.strip()) < 50: # 少ないテキストの閾値 | |
# ページを画像として抽出 | |
pix = page.get_pixmap() | |
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
img_np = np.array(img) | |
# EasyOCRを使用してテキスト抽出 | |
result = self.reader.readtext(img_np) | |
ocr_text = "\n".join([detection[1] for detection in result]) | |
# OCRテキストを使用 | |
text = ocr_text if ocr_text.strip() else text | |
full_text += f"--- Page {page_num + 1} ---\n{text}\n\n" | |
return full_text | |
def ask(self, question): | |
"""質問をボットに問いかけ、回答と参照ソースを取得する""" | |
if not hasattr(self, 'qa_chain') or self.qa_chain is None: | |
return "チャットボットがまだ初期化されていません。ファイルをアップロードしてください。", "" | |
try: | |
result = self.qa_chain.invoke({"query": question}) #({"question": question}) | |
# 回答の取得 | |
if "result" in result: | |
answer = result["result"] | |
else: | |
return "回答を生成できませんでした。", "" | |
# 参照ソースの取得 | |
source_documents = result.get("source_documents", []) | |
sources_text = "" | |
if source_documents: | |
sources_text = "参照ソース:\n" | |
for i, doc in enumerate(source_documents, 1): | |
source = doc.metadata.get("source", "不明") | |
filename = os.path.basename(source) | |
sources_text += f"{i}. {filename}\n" | |
return answer, sources_text | |
except Exception as e: | |
return f"エラーが発生しました: {str(e)}", "" | |
def load(self): | |
"""保存済みのベクトルストアを読み込む""" | |
if os.path.exists("./chroma_db"): | |
try: | |
embeddings = HuggingFaceEmbeddings( | |
model_name="intfloat/multilingual-e5-base", | |
model_kwargs={'device': 'cpu'} | |
) | |
self.vectorstore = Chroma( | |
persist_directory="./chroma_db", | |
embedding_function=embeddings | |
) | |
# QAチェーンを初期化 | |
self._initialize_qa_chain() | |
self.processing_status = "準備完了" | |
return "保存済みの知識ベースを読み込みました" | |
except Exception as e: | |
self.processing_status = "エラー" | |
return f"知識ベースの読み込みに失敗しました: {str(e)}" | |
else: | |
self.processing_status = "初期化待ち" | |
return "知識ベースが見つかりません。ファイルをアップロードしてください。" | |
# Gradioインターフェースの作成 | |
def create_interface(): | |
# チャットボットのインスタンスを作成 | |
bot = ManualChatbot(docs_dir="./manuals") | |
# 保存済みデータがあれば読み込む | |
load_status = bot.load() | |
# Gradioインターフェース | |
with gr.Blocks(title="手順書チャットボット") as demo: | |
gr.Markdown("# 手順書チャットボット") | |
gr.Markdown("PDFやExcel、画像ファイルをアップロードして、それらの内容に関する質問に答えます。") | |
with gr.Tab("ファイルアップロード"): | |
upload_files = gr.File(file_count="multiple", label="PDFやExcel、画像ファイルをアップロード") | |
upload_button = gr.Button("処理開始") | |
status_output = gr.Textbox(label="ステータス", value=load_status) | |
upload_button.click( | |
fn=bot.process_uploaded_files, | |
inputs=[upload_files], | |
outputs=[status_output] | |
) | |
with gr.Tab("チャット"): | |
chatbot = gr.Chatbot(label="会話") | |
msg = gr.Textbox(label="質問を入力してください") | |
clear = gr.Button("クリア") | |
def respond(message, chat_history): | |
if not message.strip(): | |
return chat_history | |
# ボットに質問する | |
bot_response, sources = bot.ask(message) | |
# 回答とソース情報を組み合わせる | |
full_response = bot_response | |
if sources: | |
full_response += f"\n\n{sources}" | |
# チャット履歴を更新する | |
chat_history.append((message, full_response)) | |
return "", chat_history | |
msg.submit(respond, [msg, chatbot], [msg, chatbot]) | |
clear.click(lambda: None, None, chatbot, queue=False) | |
with gr.Tab("使い方"): | |
gr.Markdown(""" | |
## 使い方 | |
1. **ファイルアップロード**タブで、PDFファイル、Excelファイル、または画像ファイルをアップロードします。 | |
2. **処理開始**ボタンをクリックして、ファイルを処理します。 | |
3. 処理が完了したら**チャット**タブに移動します。 | |
4. 質問を入力して、手順書の内容に基づいた回答を得ることができます。 | |
## サポートしているファイル形式 | |
- PDF (.pdf) | |
- Excel (.xlsx, .xls) | |
- 画像ファイル (.png, .jpg, .jpeg) | |
## 注意事項 | |
- 大きなファイルの処理には時間がかかる場合があります。 | |
- 画像からのテキスト抽出(OCR)は言語によって精度が異なります。 | |
- 回答は参照元のドキュメントに基づいて生成されるため、データが不十分な場合は正確な回答ができない場合があります。 | |
""") | |
return demo | |
# Hugging Face Spacesで実行する場合のエントリーポイント | |
if __name__ == "__main__": | |
# Gradioインターフェースを作成して起動 | |
demo = create_interface() | |
demo.launch() |