|
|
|
import os |
|
from modules import app_constants, app_to_vectorstore,app_page_definitions,common_utils |
|
from modules import app_logger |
|
import json |
|
import requests |
|
import hashlib |
|
import re, csv |
|
|
|
|
|
app_logger = app_logger.app_logger |
|
work_dir = app_constants.WORKSPACE_DIRECTORY |
|
system_content_file = metadata_path=app_constants.SYSTEM_CONTENT_DATA |
|
|
|
def download_file(url): |
|
try: |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
sanitized_filename = sanitize_filename(url.split('/')[-1]) |
|
sanitized_local_path = os.path.join(app_constants.WORKSPACE_DIRECTORY+"/docs/", sanitized_filename) |
|
with open(sanitized_local_path, 'wb') as f: |
|
f.write(response.content) |
|
app_logger.info(f"File downloaded successfully: {sanitized_local_path}") |
|
return True |
|
except Exception as e: |
|
app_logger.error(f"Failed to download file from {url}. Error: {e}") |
|
return False |
|
|
|
def index_file(local_path, module): |
|
try: |
|
status = app_to_vectorstore.get_chroma_index(local_path,module,True) |
|
app_logger.info(f"File indexed successfully: {local_path}") |
|
except Exception as e: |
|
app_logger.error(f"Failed to index file. Error: {e}") |
|
db.persist() |
|
db = None |
|
return status |
|
|
|
def compute_md5(file_path): |
|
hash_md5 = hashlib.md5() |
|
try: |
|
with open(file_path, "rb") as f: |
|
for chunk in iter(lambda: f.read(4096), b""): |
|
hash_md5.update(chunk) |
|
return hash_md5.hexdigest() |
|
except Exception as e: |
|
app_logger.error(f"Error computing MD5 for {file_path}: {e}") |
|
return None |
|
|
|
def sanitize_filename(filename): |
|
"""Sanitize the filename by removing or replacing invalid characters and handling URLs.""" |
|
|
|
|
|
filename = os.path.basename(filename) |
|
|
|
|
|
sanitized = filename.lower().replace(' ', '_') |
|
|
|
|
|
sanitized = re.sub(r'[^\w\-_\.]', '_', sanitized) |
|
|
|
|
|
max_length = 255 |
|
if len(sanitized) > max_length: |
|
|
|
file_parts = os.path.splitext(sanitized) |
|
ext = file_parts[1] |
|
sanitized = sanitized[:max_length - len(ext)] + ext |
|
return sanitized |
|
|
|
def delete_files(work_dir=work_dir): |
|
for root, dirs, files in os.walk(work_dir, topdown=False): |
|
for name in files: |
|
file_path = os.path.join(root, name) |
|
try: |
|
os.unlink(file_path) |
|
app_logger.info(f"Deleted file: {file_path}") |
|
except Exception as e: |
|
app_logger.error(f"Failed to delete {file_path}. Reason: {e}") |
|
|
|
for name in dirs: |
|
dir_path = os.path.join(root, name) |
|
try: |
|
os.rmdir(dir_path) |
|
app_logger.info(f"Deleted directory: {dir_path}") |
|
except Exception as e: |
|
app_logger.error(f"Failed to delete {dir_path}. Reason: {e}") |
|
remove_local_uploads() |
|
|
|
def save_uploaded_file(uploaded_file, uploads_path, sanitized_filename=None): |
|
if sanitized_filename is None: |
|
sanitized_filename = sanitize_filename(uploaded_file.name) |
|
file_path = os.path.join(uploads_path, sanitized_filename) |
|
|
|
with open(file_path, "wb") as f: |
|
f.write(uploaded_file.getbuffer()) |
|
app_logger.info(f"File '{sanitized_filename}' uploaded to {uploads_path}") |
|
return file_path |
|
|
|
def perform_file_operation(resource, operation): |
|
url = resource.get("url", "") |
|
content_type = resource.get("content_type", "") |
|
file_name = work_dir+"docs/" +sanitize_filename(url) |
|
if operation == "download": |
|
|
|
if url: |
|
download_success = download_file(url) |
|
if download_success: |
|
app_logger.info(f"File {resource['name']} downloaded successfully.") |
|
else: |
|
app_logger.error(f"Failed to download file {resource['name']}.") |
|
elif operation == "learn": |
|
module = common_utils.get_content_mapping_to_module(content_type) |
|
|
|
index_file(file_name, module) |
|
else: |
|
app_logger.error(f"Unknown operation: {operation}") |
|
|
|
|
|
def get_indexed_files_for_page(page_id): |
|
try: |
|
filtered_files = [] |
|
|
|
|
|
with open(os.path.join(work_dir, app_constants.PROCESSED_DOCS), mode='r', newline='', encoding='utf-8') as file: |
|
csv_reader = csv.reader(file) |
|
for row in csv_reader: |
|
|
|
if len(row) > 2 and row[1].lower() == page_id.lower(): |
|
|
|
file_name = os.path.basename(row[2]) |
|
filtered_files.append(file_name) |
|
|
|
return filtered_files |
|
except Exception as e: |
|
return [] |
|
|
|
def update_json_file(data, file_path): |
|
with open(file_path, "w") as file: |
|
json.dump(data, file, indent=4) |
|
|
|
def load_json_data(file_path): |
|
with open(file_path, "r") as file: |
|
return json.load(file) |
|
|
|
def handle_content_update(uploaded_file=None, manual_name="", manual_url="", selected_content_type=""): |
|
system_content_file = app_constants.SYSTEM_CONTENT_DATA |
|
uploads_directory = os.path.join(work_dir, "docs") |
|
file_data = load_json_data(system_content_file) |
|
|
|
if uploaded_file: |
|
filename = sanitize_filename(uploaded_file.name if uploaded_file else manual_name) |
|
file_path = save_file(uploaded_file, filename, uploads_directory) |
|
else: |
|
filename = sanitize_filename(manual_url) |
|
file_path = save_file(uploaded_file, filename, uploads_directory) if uploaded_file else manual_url |
|
|
|
new_entry = {"name": filename, "url": file_path, "content_type": selected_content_type} |
|
file_data.append(new_entry) |
|
update_json_file(file_data, system_content_file) |
|
|
|
def save_file(uploaded_file, filename, directory): |
|
if not os.path.exists(directory): |
|
os.makedirs(directory) |
|
file_path = os.path.join(directory, filename) |
|
with open(file_path, "wb") as file: |
|
file.write(uploaded_file.getbuffer()) |
|
return file_path |
|
|
|
def remove_local_uploads(file_path=app_constants.SYSTEM_CONTENT_DATA): |
|
|
|
with open(file_path, 'r') as file: |
|
data = json.load(file) |
|
|
|
filtered_data = [entry for entry in data if not entry['url'].startswith('./')] |
|
|
|
with open(file_path, 'w') as file: |
|
json.dump(filtered_data, file, indent=4) |