| |
| import os |
| from modules import app_constants, app_to_vectorstore,app_page_definitions,common_utils |
| from modules import app_logger |
| import json |
| import requests |
| import hashlib |
| import re, csv |
|
|
| |
| app_logger = app_logger.app_logger |
| work_dir = app_constants.WORKSPACE_DIRECTORY |
| system_content_file = metadata_path=app_constants.SYSTEM_CONTENT_DATA |
|
|
| def download_file(url): |
| try: |
| response = requests.get(url) |
| response.raise_for_status() |
| sanitized_filename = sanitize_filename(url.split('/')[-1]) |
| sanitized_local_path = os.path.join(app_constants.WORKSPACE_DIRECTORY+"/docs/", sanitized_filename) |
| with open(sanitized_local_path, 'wb') as f: |
| f.write(response.content) |
| app_logger.info(f"File downloaded successfully: {sanitized_local_path}") |
| return True |
| except Exception as e: |
| app_logger.error(f"Failed to download file from {url}. Error: {e}") |
| return False |
|
|
| def index_file(local_path, module): |
| try: |
| status = app_to_vectorstore.get_chroma_index(local_path,module,True) |
| app_logger.info(f"File indexed successfully: {local_path}") |
| except Exception as e: |
| app_logger.error(f"Failed to index file. Error: {e}") |
| db.persist() |
| db = None |
| return status |
| |
| def compute_md5(file_path): |
| hash_md5 = hashlib.md5() |
| try: |
| with open(file_path, "rb") as f: |
| for chunk in iter(lambda: f.read(4096), b""): |
| hash_md5.update(chunk) |
| return hash_md5.hexdigest() |
| except Exception as e: |
| app_logger.error(f"Error computing MD5 for {file_path}: {e}") |
| return None |
|
|
| def sanitize_filename(filename): |
| """Sanitize the filename by removing or replacing invalid characters and handling URLs.""" |
|
|
| |
| filename = os.path.basename(filename) |
|
|
| |
| sanitized = filename.lower().replace(' ', '_') |
|
|
| |
| sanitized = re.sub(r'[^\w\-_\.]', '_', sanitized) |
|
|
| |
| max_length = 255 |
| if len(sanitized) > max_length: |
| |
| file_parts = os.path.splitext(sanitized) |
| ext = file_parts[1] |
| sanitized = sanitized[:max_length - len(ext)] + ext |
| return sanitized |
|
|
| def delete_files(work_dir=work_dir): |
| for root, dirs, files in os.walk(work_dir, topdown=False): |
| for name in files: |
| file_path = os.path.join(root, name) |
| try: |
| os.unlink(file_path) |
| app_logger.info(f"Deleted file: {file_path}") |
| except Exception as e: |
| app_logger.error(f"Failed to delete {file_path}. Reason: {e}") |
|
|
| for name in dirs: |
| dir_path = os.path.join(root, name) |
| try: |
| os.rmdir(dir_path) |
| app_logger.info(f"Deleted directory: {dir_path}") |
| except Exception as e: |
| app_logger.error(f"Failed to delete {dir_path}. Reason: {e}") |
| remove_local_uploads() |
|
|
| def save_uploaded_file(uploaded_file, uploads_path, sanitized_filename=None): |
| if sanitized_filename is None: |
| sanitized_filename = sanitize_filename(uploaded_file.name) |
| file_path = os.path.join(uploads_path, sanitized_filename) |
|
|
| with open(file_path, "wb") as f: |
| f.write(uploaded_file.getbuffer()) |
| app_logger.info(f"File '{sanitized_filename}' uploaded to {uploads_path}") |
| return file_path |
|
|
| def perform_file_operation(resource, operation): |
| url = resource.get("url", "") |
| content_type = resource.get("content_type", "") |
| file_name = work_dir+"docs/" +sanitize_filename(url) |
| if operation == "download": |
| |
| if url: |
| download_success = download_file(url) |
| if download_success: |
| app_logger.info(f"File {resource['name']} downloaded successfully.") |
| else: |
| app_logger.error(f"Failed to download file {resource['name']}.") |
| elif operation == "learn": |
| module = common_utils.get_content_mapping_to_module(content_type) |
| |
| index_file(file_name, module) |
| else: |
| app_logger.error(f"Unknown operation: {operation}") |
|
|
|
|
| def get_indexed_files_for_page(page_id): |
| try: |
| filtered_files = [] |
|
|
| |
| with open(os.path.join(work_dir, app_constants.PROCESSED_DOCS), mode='r', newline='', encoding='utf-8') as file: |
| csv_reader = csv.reader(file) |
| for row in csv_reader: |
| |
| if len(row) > 2 and row[1].lower() == page_id.lower(): |
| |
| file_name = os.path.basename(row[2]) |
| filtered_files.append(file_name) |
|
|
| return filtered_files |
| except Exception as e: |
| return [] |
|
|
| def update_json_file(data, file_path): |
| with open(file_path, "w") as file: |
| json.dump(data, file, indent=4) |
|
|
| def load_json_data(file_path): |
| with open(file_path, "r") as file: |
| return json.load(file) |
| |
| def handle_content_update(uploaded_file=None, manual_name="", manual_url="", selected_content_type=""): |
| system_content_file = app_constants.SYSTEM_CONTENT_DATA |
| uploads_directory = os.path.join(work_dir, "docs") |
| file_data = load_json_data(system_content_file) |
|
|
| if uploaded_file: |
| filename = sanitize_filename(uploaded_file.name if uploaded_file else manual_name) |
| file_path = save_file(uploaded_file, filename, uploads_directory) |
| else: |
| filename = sanitize_filename(manual_url) |
| file_path = save_file(uploaded_file, filename, uploads_directory) if uploaded_file else manual_url |
|
|
| new_entry = {"name": filename, "url": file_path, "content_type": selected_content_type} |
| file_data.append(new_entry) |
| update_json_file(file_data, system_content_file) |
|
|
| def save_file(uploaded_file, filename, directory): |
| if not os.path.exists(directory): |
| os.makedirs(directory) |
| file_path = os.path.join(directory, filename) |
| with open(file_path, "wb") as file: |
| file.write(uploaded_file.getbuffer()) |
| return file_path |
|
|
| def remove_local_uploads(file_path=app_constants.SYSTEM_CONTENT_DATA): |
| |
| with open(file_path, 'r') as file: |
| data = json.load(file) |
| |
| filtered_data = [entry for entry in data if not entry['url'].startswith('./')] |
| |
| with open(file_path, 'w') as file: |
| json.dump(filtered_data, file, indent=4) |