| from pathlib import Path |
| import os |
| import shutil |
| import string |
| import secrets |
| import hashlib |
| import random |
| import time |
| import re |
|
|
| def get_files_count(directory_path): |
| return len(os.listdir(directory_path)) |
|
|
| def generate_random_string(length=10): |
| characters = string.ascii_letters |
| random_string = ''.join(secrets.choice(characters) for _ in range(length)) |
| return random_string |
|
|
| def generate_random_string_from_input(input_string, length=16): |
| |
| hash_object = hashlib.sha256(input_string.encode()) |
| hashed_string = hash_object.hexdigest() |
|
|
| |
| random.seed(hashed_string) |
|
|
| |
| characters = string.ascii_letters + string.digits |
| random_string = ''.join(random.choice(characters) for _ in range(length)) |
|
|
| return random_string |
|
|
| def is_mostly_black(frame, black_threshold=20, percentage_threshold=0.9, sample_rate=10): |
| """ |
| Fast black frame detection using pixel sampling. |
| |
| Args: |
| frame: OpenCV BGR frame (NumPy array) |
| black_threshold: grayscale value below which a pixel is considered black |
| percentage_threshold: fraction of black pixels to consider frame mostly black |
| sample_rate: sample every N-th pixel in both dimensions (higher = faster) |
| Returns: |
| True if mostly black, False otherwise |
| """ |
| import cv2 |
| import numpy as np |
| if frame is None or frame.size == 0: |
| return True |
| |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
| |
| sampled = gray[::sample_rate, ::sample_rate] |
| black_count = np.sum(sampled < black_threshold) |
| total_count = sampled.size |
| return (black_count / total_count) >= percentage_threshold |
|
|
| def only_alpha(text: str) -> str: |
| |
| return re.sub(r'[^a-zA-Z]', '', text).lower() |
|
|
| def manage_gpu(size_gb: float = 0, gpu_index: int = 0, action: str = "check"): |
| """ |
| Manage GPU memory: |
| - check → just prints memory + process table |
| - clear_cache → clears PyTorch cache |
| - kill → kills all GPU processes |
| """ |
| try: |
| import pynvml,signal, gc |
| pynvml.nvmlInit() |
| handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_index) |
| info = pynvml.nvmlDeviceGetMemoryInfo(handle) |
|
|
| free_gb = info.free / 1024**3 |
| total_gb = info.total / 1024**3 |
|
|
| print(f"\nGPU {gpu_index}: Free {free_gb:.2f} GB / Total {total_gb:.2f} GB") |
|
|
| |
| processes = pynvml.nvmlDeviceGetComputeRunningProcesses(handle) |
| print("\nActive GPU Processes:") |
| print(f"{'PID':<8} {'Process Name':<40} {'Used (GB)':<10}") |
| print("-" * 60) |
| for p in processes: |
| used_gb = p.usedGpuMemory / 1024**3 |
| proc_name = pynvml.nvmlSystemGetProcessName(p.pid).decode(errors="ignore") |
| print(f"{p.pid:<8} {proc_name:<40} {used_gb:.2f}") |
|
|
| if action == "clear_cache": |
| try: |
| import torch |
| gc.collect() |
| gc.collect() |
| torch.cuda.empty_cache() |
| torch.cuda.reset_peak_memory_stats() |
| torch.cuda.synchronize() |
| time.sleep(1) |
| print("\n🧹 Cleared PyTorch CUDA cache") |
| except ImportError: |
| print("\n⚠️ PyTorch not installed, cannot clear cache.") |
|
|
| elif action == "kill": |
| for p in processes: |
| proc_name = pynvml.nvmlSystemGetProcessName(p.pid).decode(errors="ignore") |
| try: |
| os.kill(p.pid, signal.SIGKILL) |
| print(f"❌ Killed {p.pid} ({proc_name})") |
| except Exception as e: |
| print(f"⚠️ Could not kill {p.pid}: {e}") |
| manage_gpu(action="clear_cache") |
| gc.collect() |
| gc.collect() |
| return free_gb > size_gb |
| except: return False |
|
|
| def is_gpu_available(verbose=True): |
| import torch |
| if not torch.cuda.is_available(): |
| if verbose: |
| print("CUDA not available.") |
| return False |
| |
| try: |
| |
| torch.empty(1, device="cuda") |
| if verbose: |
| print(f"CUDA available. Using device: {torch.cuda.get_device_name(0)}") |
| return True |
| except RuntimeError as e: |
| if "CUDA-capable device(s) is/are busy or unavailable" in str(e) or \ |
| "CUDA error" in str(e): |
| if verbose: |
| print("CUDA detected but busy/unavailable. Please CPU.") |
| return False |
| raise |