|
import os |
|
import shutil |
|
import string |
|
import zipfile |
|
from urllib.parse import urljoin |
|
|
|
import nltk |
|
import requests |
|
|
|
from application.core.settings import settings |
|
from application.parser.file.bulk import SimpleDirectoryReader |
|
from application.parser.open_ai_func import call_openai_api |
|
from application.parser.schema.base import Document |
|
from application.parser.token_func import group_split |
|
|
|
try: |
|
nltk.download('punkt', quiet=True) |
|
nltk.download('averaged_perceptron_tagger', quiet=True) |
|
except FileExistsError: |
|
pass |
|
|
|
|
|
|
|
def metadata_from_filename(title): |
|
store = '/'.join(title.split('/')[1:3]) |
|
return {'title': title, 'store': store} |
|
|
|
|
|
|
|
def generate_random_string(length): |
|
return ''.join([string.ascii_letters[i % 52] for i in range(length)]) |
|
|
|
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
|
|
|
def ingest_worker(self, directory, formats, name_job, filename, user): |
|
""" |
|
Ingest and process documents. |
|
|
|
Args: |
|
self: Reference to the instance of the task. |
|
directory (str): Specifies the directory for ingesting ('inputs' or 'temp'). |
|
formats (list of str): List of file extensions to consider for ingestion (e.g., [".rst", ".md"]). |
|
name_job (str): Name of the job for this ingestion task. |
|
filename (str): Name of the file to be ingested. |
|
user (str): Identifier for the user initiating the ingestion. |
|
|
|
Returns: |
|
dict: Information about the completed ingestion task, including input parameters and a "limited" flag. |
|
""" |
|
|
|
|
|
input_files = None |
|
recursive = True |
|
limit = None |
|
exclude = True |
|
|
|
|
|
|
|
sample = False |
|
token_check = True |
|
min_tokens = 150 |
|
max_tokens = 1250 |
|
full_path = directory + '/' + user + '/' + name_job |
|
import sys |
|
print(full_path, file=sys.stderr) |
|
|
|
file_data = {'name': name_job, 'file': filename, 'user': user} |
|
response = requests.get(urljoin(settings.API_URL, "/api/download"), params=file_data) |
|
|
|
print(response, file=sys.stderr) |
|
file = response.content |
|
|
|
if not os.path.exists(full_path): |
|
os.makedirs(full_path) |
|
with open(full_path + '/' + filename, 'wb') as f: |
|
f.write(file) |
|
|
|
|
|
if filename.endswith('.zip'): |
|
with zipfile.ZipFile(full_path + '/' + filename, 'r') as zip_ref: |
|
zip_ref.extractall(full_path) |
|
os.remove(full_path + '/' + filename) |
|
|
|
self.update_state(state='PROGRESS', meta={'current': 1}) |
|
|
|
raw_docs = SimpleDirectoryReader(input_dir=full_path, input_files=input_files, recursive=recursive, |
|
required_exts=formats, num_files_limit=limit, |
|
exclude_hidden=exclude, file_metadata=metadata_from_filename).load_data() |
|
raw_docs = group_split(documents=raw_docs, min_tokens=min_tokens, max_tokens=max_tokens, token_check=token_check) |
|
|
|
docs = [Document.to_langchain_format(raw_doc) for raw_doc in raw_docs] |
|
|
|
call_openai_api(docs, full_path, self) |
|
self.update_state(state='PROGRESS', meta={'current': 100}) |
|
|
|
if sample: |
|
for i in range(min(5, len(raw_docs))): |
|
print(raw_docs[i].text) |
|
|
|
|
|
|
|
file_data = {'name': name_job, 'user': user} |
|
if settings.VECTOR_STORE == "faiss": |
|
files = {'file_faiss': open(full_path + '/index.faiss', 'rb'), |
|
'file_pkl': open(full_path + '/index.pkl', 'rb')} |
|
response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data) |
|
response = requests.get(urljoin(settings.API_URL, "/api/delete_old?path=" + full_path)) |
|
else: |
|
response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data) |
|
|
|
|
|
|
|
shutil.rmtree(full_path) |
|
|
|
return { |
|
'directory': directory, |
|
'formats': formats, |
|
'name_job': name_job, |
|
'filename': filename, |
|
'user': user, |
|
'limited': False |
|
} |
|
|