Spaces:
Running
Running
| """ | |
| Semantic sentence compression module using clustering-based redundancy removal. | |
| This module implements adaptive semantic compression by grouping semantically | |
| similar sentences and selecting representative centroids, reducing token count | |
| while preserving core meaning and constraints. | |
| Mathematical Foundations | |
| ------------------------ | |
| 1. Embedding Variance Filtering: | |
| Given embeddings E ∈ ℝⁿˣᵈ, variance per sentence: | |
| varᵢ = (1/d) Σⱼ (eᵢⱼ - μᵢ)² where μᵢ = mean(eᵢ) | |
| Sentences with varᵢ < τ_variance are discarded as low-information. | |
| Reference: Jolliffe, "Principal Component Analysis" [1] | |
| 2. Euclidean Distance Matrix: | |
| Dᵢⱼ = ||eᵢ - eⱼ||₂ = √(Σₖ (eᵢₖ - eⱼₖ)²) | |
| Computed via scipy.spatial.distance.cdist with O(n²·d) complexity. | |
| Reference: scipy.spatial.distance documentation [2] | |
| 3. Hierarchical Clustering (Ward's Method): | |
| Minimizes within-cluster variance at each merge: | |
| Δ(A,B) = [n_A·n_B / (n_A + n_B)] · ||μ_A - μ_B||₂² | |
| Time complexity: O(n³) naive, O(n²·d) with optimized linkage. | |
| Reference: Ward, "Hierarchical Grouping to Optimize an Objective Function" [3] | |
| 4. K-Means Clustering (MiniBatch variant): | |
| Objective: minimize Σᵢ ||eᵢ - c_{label(i)}||₂² | |
| MiniBatch: uses random subsets of size b << n per iteration. | |
| Time complexity: O(n·k·d·i) where i=iterations, k=clusters. | |
| Reference: Sculley, "Web-Scale K-Means Clustering" [4] | |
| 5. Adaptive Threshold via Percentile: | |
| cutoff = P(aggressiveness·100) of upper-triangular distance matrix | |
| Lower percentile → stricter merging → higher compression. | |
| Mathematical: cutoff = quantile({Dᵢⱼ : i < j}, q=aggressiveness) | |
| 6. Centroid Selection within Clusters: | |
| For cluster C with embeddings {e₁, ..., eₘ}: | |
| centroid μ = (1/m) Σᵢ eᵢ | |
| representative = argmin_{e∈C} ||e - μ||₂ | |
| Ensures selected sentence is most central semantically. | |
| Compression Ratio Formula | |
| ------------------------- | |
| R = 1 - (|S_compressed| / |S_original|) ∈ [0, 1] | |
| Where |S| = number of sentences after processing. | |
| Effective aggressiveness α_eff ∈ [0, 1] controls target ratio: | |
| n_clusters ≈ |S_normal| · (1 - α_eff) (for K-Means) | |
| cutoff_distance ≈ P(α_eff·100) (for Ward) | |
| References | |
| ---------- | |
| [1] Jolliffe, I. T. (2002). Principal Component Analysis, 2nd ed. Springer. | |
| [2] Virtanen, P., et al. (2020). SciPy 1.0: Fundamental Algorithms for | |
| Scientific Computing in Python. Nature Methods. | |
| https://github.com/scipy/scipy | |
| [3] Ward, J. H. (1963). Hierarchical Grouping to Optimize an Objective | |
| Function. Journal of the American Statistical Association, 58(301), 236-244. | |
| https://doi.org/10.1080/01621459.1963.10500845 | |
| [4] Sculley, D. (2010). Web-Scale K-Means Clustering. WWW 2010. | |
| https://github.com/google-research/google-research/tree/master/sculley_kmeans | |
| [5] Reimers, N., & Gurevych, I. (2019). Sentence-BERT: Sentence embeddings | |
| using Siamese BERT-networks. EMNLP-IJCNLP 2019. | |
| https://github.com/UKPLab/sentence-transformers | |
| Performance Characteristics | |
| --------------------------- | |
| - _remove_low_variance(): O(n·d) where n=sentences, d=embedding_dim | |
| - _compute_condensed_distance(): O(n²·d) for pairwise Euclidean distances | |
| - _cluster_ward(): O(n²·d + n² log n) for linkage + tree cutting | |
| - _cluster_kmeans(): O(n·k·d·i) with MiniBatch optimization (i≈10-100) | |
| - compress() full pipeline: | |
| * Small n (<200): O(n²·d) dominated by Ward clustering | |
| * Large n (≥200): O(n·k·d·i) dominated by K-Means | |
| - Memory: O(n²) for distance matrix (Ward), O(n·d) for K-Means | |
| Author: IntelliDeep Labs Team | |
| License: BSL 1.1 | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| import time | |
| from typing import Dict, List, Optional, Tuple | |
| import numpy as np | |
| from scipy.cluster.hierarchy import linkage, fcluster | |
| from scipy.spatial.distance import cdist, squareform | |
| from sklearn.cluster import MiniBatchKMeans | |
| logger = logging.getLogger(__name__) | |
| class SemanticCompressor: | |
| """ | |
| Adaptive semantic compressor using clustering-based redundancy removal. | |
| This class reduces sentence count by grouping semantically similar sentences | |
| and selecting representative centroids, while preserving: | |
| - Protected placeholders (__PROT_*) from code/PII shielding | |
| - Domain-specific compression aggressiveness presets | |
| - Original sentence ordering for coherent reconstruction | |
| Key Features | |
| ------------ | |
| - Dual clustering backends: Ward (hierarchical, precise) or K-Means (fast) | |
| - Auto-selection based on sentence count threshold | |
| - Variance-based filtering to discard low-information sentences | |
| - Adaptive distance cutoff via percentile of empirical distance distribution | |
| - Async support for non-blocking batch processing | |
| Compression Strategy | |
| -------------------- | |
| 1. Separate protected sentences (placeholders) from compressible content | |
| 2. Filter out low-variance embeddings (noise reduction) | |
| 3. Cluster remaining sentences using selected algorithm: | |
| * Ward: Hierarchical agglomerative clustering with variance minimization | |
| * K-Means: Partition-based clustering with MiniBatch optimization | |
| 4. Select centroid sentence from each cluster (closest to mean embedding) | |
| 5. Merge protected + compressed sentences preserving original order | |
| Mathematical Parameters | |
| ----------------------- | |
| - aggressiveness ∈ [0, 1]: Target compression intensity | |
| * 0.0: No compression (keep all sentences) | |
| * 1.0: Maximum compression (one sentence per semantic group) | |
| * Default presets by mode: legal=0.25, finance=0.30, code=0.45, general=0.40 | |
| - min_variance ≥ 0: Threshold for discarding low-information sentences | |
| * Computed as variance across embedding dimensions | |
| * Sentences with var < min_variance are considered noise | |
| - auto_method_threshold: Sentence count at which to switch from Ward to K-Means | |
| * Ward: O(n²) memory, precise for small n | |
| * K-Means: O(n) memory, scalable for large n | |
| Usage Example | |
| ------------- | |
| >>> compressor = SemanticCompressor(mode="code", aggressiveness=0.3) | |
| >>> compressed_sentences, stats = compressor.compress(sentences, embeddings) | |
| >>> print(f"Compression: {stats['original_count']} → {stats['compressed_count']} " | |
| ... f"({stats['compression_ratio']:.1%} reduction)") | |
| """ | |
| # Domain-specific aggressiveness presets (empirically tuned) | |
| # Higher values = more aggressive compression (fewer output sentences) | |
| _MODE_AGGRESSIVENESS: Dict[str, float] = { | |
| "legal": 0.25, # Conservative: preserve legal nuance | |
| "finance": 0.30, # Moderate: balance precision and brevity | |
| "code": 0.45, # Aggressive: code is repetitive by nature | |
| "general": 0.40, # Balanced default | |
| } | |
| # Default configuration values | |
| _DEFAULT_AGGRESSIVENESS: float = 0.25 | |
| _DEFAULT_MIN_VARIANCE: float = 0.0 | |
| _DEFAULT_AUTO_METHOD_THRESHOLD: int = 200 | |
| def __init__( | |
| self, | |
| aggressiveness: float = _DEFAULT_AGGRESSIVENESS, | |
| min_variance: float = _DEFAULT_MIN_VARIANCE, | |
| mode: Optional[str] = None, | |
| method: Optional[str] = None, | |
| auto_method_threshold: int = _DEFAULT_AUTO_METHOD_THRESHOLD, | |
| ) -> None: | |
| """ | |
| Initialize the SemanticCompressor. | |
| Parameters | |
| ---------- | |
| aggressiveness : float, optional | |
| Target compression intensity in [0, 1]. Higher = more compression. | |
| Overridden by mode preset if mode is specified. | |
| min_variance : float, optional | |
| Minimum embedding variance to retain a sentence. Sentences with | |
| variance below this threshold are discarded as low-information. | |
| mode : Optional[str], optional | |
| Domain mode: "legal", "finance", "code", or "general". | |
| Sets aggressiveness preset and may influence future extensions. | |
| method : Optional[str], optional | |
| Clustering algorithm: "ward" (hierarchical) or "kmeans" (partition). | |
| If None, auto-selects based on sentence count vs auto_method_threshold. | |
| auto_method_threshold : int, optional | |
| Sentence count threshold for auto-selecting clustering method. | |
| Below: use Ward (precise). Above: use K-Means (scalable). | |
| Raises | |
| ------ | |
| ValueError | |
| If aggressiveness not in [0, 1] or min_variance < 0. | |
| Complexity | |
| ---------- | |
| Time: O(1) initialization | |
| Space: O(1) additional state | |
| """ | |
| # Validate parameters | |
| if not 0.0 <= aggressiveness <= 1.0: | |
| raise ValueError(f"aggressiveness must be in [0, 1], got {aggressiveness}") | |
| if min_variance < 0: | |
| raise ValueError(f"min_variance must be >= 0, got {min_variance}") | |
| # Store base configuration | |
| self._base_aggressiveness = aggressiveness | |
| self.min_variance = min_variance | |
| self.mode = mode | |
| self.method = method | |
| self.auto_method_threshold = auto_method_threshold | |
| # Resolve effective aggressiveness (mode preset overrides explicit value) | |
| if mode is not None and mode in self._MODE_AGGRESSIVENESS: | |
| self.aggressiveness = self._MODE_AGGRESSIVENESS[mode] | |
| logger.debug(f"Mode '{mode}' preset: aggressiveness={self.aggressiveness}") | |
| else: | |
| self.aggressiveness = aggressiveness | |
| logger.info( | |
| f"SemanticCompressor initialized: aggressiveness={self.aggressiveness:.2f}, " | |
| f"min_variance={self.min_variance}, mode={mode or 'manual'}, " | |
| f"method={method or 'auto'}, auto_threshold={auto_method_threshold}" | |
| ) | |
| def _remove_low_variance( | |
| self, | |
| sentences: List[str], | |
| embeddings: np.ndarray, | |
| ) -> Tuple[List[str], np.ndarray, List[int]]: | |
| """ | |
| Filter out sentences with low embedding variance (low information content). | |
| Sentences whose embeddings have low variance across dimensions are likely | |
| generic, repetitive, or semantically empty. Removing them improves | |
| compression quality by focusing on informative content. | |
| Parameters | |
| ---------- | |
| sentences : List[str] | |
| List of sentence strings. | |
| embeddings : np.ndarray | |
| Array of shape (n_sentences, embedding_dim) with sentence embeddings. | |
| Returns | |
| ------- | |
| Tuple[List[str], np.ndarray, List[int]] | |
| - Filtered sentences list | |
| - Filtered embeddings array | |
| - Original indices of kept sentences (for order preservation) | |
| Mathematical Formulation | |
| ------------------------ | |
| For each embedding eᵢ ∈ ℝᵈ: | |
| varianceᵢ = (1/d) Σⱼ (eᵢⱼ - μᵢ)² | |
| where μᵢ = (1/d) Σⱼ eᵢⱼ (mean of embedding components) | |
| Keep sentence i iff: varianceᵢ >= min_variance | |
| Complexity | |
| ---------- | |
| Time: O(n·d) where n=sentences, d=embedding_dim | |
| Space: O(n) for variance array + output lists | |
| Reference | |
| --------- | |
| [1] Jolliffe, I. T. (2002). Principal Component Analysis. | |
| """ | |
| if len(sentences) == 0: | |
| return [], np.array([]).reshape(0, embeddings.shape[1] if embeddings.ndim > 1 else 0), [] | |
| # Compute variance across embedding dimensions for each sentence | |
| variances = np.var(embeddings, axis=1) | |
| # Boolean mask: keep sentences with variance >= threshold | |
| mask = variances >= self.min_variance | |
| kept_indices = np.where(mask)[0].tolist() | |
| discarded_count = len(sentences) - len(kept_indices) | |
| if discarded_count > 0: | |
| logger.info(f"Filtered {discarded_count} low-variance sentences " | |
| f"(variance < {self.min_variance:.4f})") | |
| # Return filtered data with original indices for order tracking | |
| return ( | |
| [sentences[i] for i in kept_indices], | |
| embeddings[kept_indices], | |
| kept_indices, | |
| ) | |
| def _compute_condensed_distance(self, embeddings: np.ndarray) -> np.ndarray: | |
| """ | |
| Compute condensed distance matrix for hierarchical clustering. | |
| Converts square distance matrix to condensed form (upper triangle only) | |
| as required by scipy.cluster.hierarchy.linkage(). | |
| Parameters | |
| ---------- | |
| embeddings : np.ndarray | |
| Array of shape (n, d) with sentence embeddings. | |
| Returns | |
| ------- | |
| np.ndarray | |
| Condensed distance matrix of length n*(n-1)/2. | |
| Mathematical Note | |
| ----------------- | |
| Input: Square distance matrix D ∈ ℝⁿˣⁿ where Dᵢⱼ = ||eᵢ - eⱼ||₂ | |
| Output: Condensed vector containing Dᵢⱼ for all i < j | |
| Length of output: n·(n-1)/2 (number of unique pairs) | |
| Complexity | |
| ---------- | |
| Time: O(n²·d) for pairwise Euclidean distances via cdist | |
| Space: O(n²) for full distance matrix, O(n²/2) for condensed output | |
| Reference | |
| --------- | |
| [2] scipy.spatial.distance.squareform documentation | |
| """ | |
| # Compute full pairwise Euclidean distance matrix | |
| dist_square = cdist(embeddings, embeddings, metric="euclidean") | |
| # Zero diagonal (distance to self) to avoid numerical issues | |
| np.fill_diagonal(dist_square, 0) | |
| # Convert to condensed form (upper triangle, flattened) | |
| return squareform(dist_square, checks=True) | |
| def _compute_percentile_cutoff( | |
| self, | |
| dist_square: np.ndarray, | |
| aggressiveness: float | |
| ) -> float: | |
| """ | |
| Compute adaptive distance cutoff using percentile of empirical distribution. | |
| The cutoff determines cluster merging threshold in Ward clustering: | |
| clusters with inter-cluster distance < cutoff are merged. | |
| Parameters | |
| ---------- | |
| dist_square : np.ndarray | |
| Square pairwise distance matrix (n x n). | |
| aggressiveness : float | |
| Compression intensity in [0, 1]. Lower = stricter merging. | |
| Returns | |
| ------- | |
| float | |
| Distance threshold for cluster cutting. | |
| Mathematical Formulation | |
| ------------------------ | |
| Let U = {Dᵢⱼ : 0 <= i < j < n} be upper-triangular distances. | |
| cutoff = percentile(U, q = aggressiveness * 100) | |
| Interpretation: | |
| - aggressiveness=0.0 → cutoff=min(U) → merge only identical → no compression | |
| - aggressiveness=1.0 → cutoff=max(U) → merge everything → max compression | |
| - aggressiveness=0.3 → cutoff=30th percentile → moderate merging | |
| Complexity | |
| ---------- | |
| Time: O(n²) to extract upper triangle + O(n² log n) for percentile | |
| Space: O(n²) for temporary distance array | |
| Note | |
| ---- | |
| For large n, consider sampling distances for approximate percentile. | |
| """ | |
| # Extract upper triangle (unique pairwise distances, exclude diagonal) | |
| triu_indices = np.triu_indices_from(dist_square, k=1) | |
| all_distances = dist_square[triu_indices] | |
| if len(all_distances) == 0: | |
| return 0.0 | |
| # Compute percentile cutoff | |
| percentile_q = aggressiveness * 100 | |
| cutoff = float(np.percentile(all_distances, percentile_q)) | |
| logger.debug( | |
| f"Percentile cutoff: P({percentile_q:.1f}) = {cutoff:.4f} " | |
| f"(range: [{all_distances.min():.4f}, {all_distances.max():.4f}])" | |
| ) | |
| return cutoff | |
| def _cluster_kmeans( | |
| self, | |
| sentences: List[str], | |
| embeddings: np.ndarray, | |
| aggressiveness: float, | |
| original_indices: List[int], | |
| ) -> Tuple[List[str], List[int]]: | |
| """ | |
| Cluster sentences using MiniBatch K-Means and select centroid representatives. | |
| MiniBatch K-Means provides O(n) scalability vs. O(n²) for standard K-Means, | |
| making it suitable for large sentence sets while maintaining quality. | |
| Parameters | |
| ---------- | |
| sentences : List[str] | |
| List of sentence strings to cluster. | |
| embeddings : np.ndarray | |
| Array of shape (n, d) with sentence embeddings. | |
| aggressiveness : float | |
| Compression intensity: determines number of clusters as | |
| n_clusters = max(1, n * (1 - aggressiveness)). | |
| original_indices : List[int] | |
| Original positions of sentences for order preservation. | |
| Returns | |
| ------- | |
| Tuple[List[str], List[int]] | |
| - Selected representative sentences (one per cluster) | |
| - Their original indices (sorted for order preservation) | |
| Algorithm | |
| --------- | |
| 1. Compute target clusters: k = max(1, n * (1 - aggressiveness)) | |
| 2. Fit MiniBatchKMeans with k clusters | |
| 3. For each cluster: | |
| a. Compute centroid as mean of member embeddings | |
| b. Select sentence closest to centroid (most representative) | |
| 4. Sort selected sentences by original index | |
| Complexity | |
| ---------- | |
| Time: O(n·k·d·i) where i=iterations (typically 10-100) | |
| Space: O(n·d + k·d) for embeddings + centroids | |
| Reference | |
| --------- | |
| [4] Sculley, D. (2010). Web-Scale K-Means Clustering. | |
| """ | |
| n = len(sentences) | |
| if n == 0: | |
| return [], [] | |
| # Compute target number of clusters based on aggressiveness | |
| n_clusters = max(1, int(n * (1.0 - aggressiveness))) | |
| logger.debug( | |
| f"K-Means clustering: n={n}, aggressiveness={aggressiveness:.2f} → " | |
| f"n_clusters={n_clusters}" | |
| ) | |
| # Fit MiniBatch K-Means (efficient for large n) | |
| # Normalize embeddings shape: ensure 2D (n_samples, n_features) | |
| if embeddings.ndim == 3: | |
| # Common pattern: (n, 1, d) → squeeze singleton middle dim | |
| if embeddings.shape[1] == 1: | |
| embeddings = embeddings.reshape(embeddings.shape[0], embeddings.shape[2]) | |
| logger.debug("Squeezed embeddings from 3D to 2D for KMeans") | |
| else: | |
| embeddings = embeddings.reshape(embeddings.shape[0], -1) | |
| logger.warning( | |
| "Flattened 3D embeddings to 2D for KMeans; verify embedding generation" | |
| ) | |
| elif embeddings.ndim != 2: | |
| raise ValueError(f"Embeddings must be 2D array, got ndim={embeddings.ndim}") | |
| kmeans = MiniBatchKMeans( | |
| n_clusters=n_clusters, | |
| random_state=42, # Reproducibility | |
| n_init="auto", # Use auto initialization for modern sklearn | |
| max_iter=300, | |
| batch_size=min(100, n), # Adaptive batch size | |
| ) | |
| labels = kmeans.fit_predict(embeddings) | |
| selected_sentences: List[str] = [] | |
| selected_indices: List[int] = [] | |
| # Select most central sentence from each cluster | |
| for cluster_id in np.unique(labels): | |
| # Indices of sentences in this cluster | |
| cluster_mask = labels == cluster_id | |
| cluster_indices = np.where(cluster_mask)[0] | |
| if len(cluster_indices) == 1: | |
| # Singleton cluster: keep the only sentence | |
| rel_idx = cluster_indices[0] | |
| selected_sentences.append(sentences[rel_idx]) | |
| selected_indices.append(original_indices[rel_idx]) | |
| continue | |
| # Compute centroid and find closest sentence | |
| cluster_embeddings = embeddings[cluster_indices] | |
| centroid = np.mean(cluster_embeddings, axis=0, keepdims=True) | |
| # Euclidean distance from each member to centroid | |
| distances_to_centroid = cdist( | |
| cluster_embeddings, centroid, metric="euclidean" | |
| ).flatten() | |
| # Select sentence with minimum distance to centroid | |
| best_rel_idx = cluster_indices[np.argmin(distances_to_centroid)] | |
| selected_sentences.append(sentences[best_rel_idx]) | |
| selected_indices.append(original_indices[best_rel_idx]) | |
| # Sort by original index to preserve document order | |
| order = np.argsort(selected_indices) | |
| return ( | |
| [selected_sentences[i] for i in order], | |
| [selected_indices[i] for i in order], | |
| ) | |
| def _cluster_ward( | |
| self, | |
| sentences: List[str], | |
| embeddings: np.ndarray, | |
| aggressiveness: float, | |
| original_indices: List[int], | |
| ) -> Tuple[List[str], List[int]]: | |
| """ | |
| Cluster sentences using Ward's hierarchical method and select representatives. | |
| Ward's method minimizes within-cluster variance at each merge step, | |
| producing high-quality clusters for semantic grouping. Best for small n. | |
| Parameters | |
| ---------- | |
| sentences : List[str] | |
| List of sentence strings to cluster. | |
| embeddings : np.ndarray | |
| Array of shape (n, d) with sentence embeddings. | |
| aggressiveness : float | |
| Compression intensity: determines distance cutoff for cluster cutting. | |
| original_indices : List[int] | |
| Original positions of sentences for order preservation. | |
| Returns | |
| ------- | |
| Tuple[List[str], List[int]] | |
| - Selected representative sentences (one per cluster) | |
| - Their original indices (sorted for order preservation) | |
| Algorithm | |
| --------- | |
| 1. Compute pairwise Euclidean distance matrix | |
| 2. Build linkage tree using Ward's variance-minimizing criterion | |
| 3. Cut tree at adaptive distance threshold (percentile-based) | |
| 4. For each cluster: select sentence closest to cluster centroid | |
| 5. Sort selected sentences by original index | |
| Complexity | |
| ---------- | |
| Time: O(n²·d) for distances + O(n² log n) for linkage + O(n·k·d) for selection | |
| Space: O(n²) for distance matrix + O(n) for linkage tree | |
| Reference | |
| --------- | |
| [3] Ward, J. H. (1963). Hierarchical Grouping to Optimize an Objective Function. | |
| """ | |
| n = len(sentences) | |
| if n <= 1: | |
| return sentences.copy(), original_indices.copy() | |
| # Compute pairwise Euclidean distances | |
| # Ensure embeddings are 2D | |
| if embeddings.ndim == 3: | |
| if embeddings.shape[1] == 1: | |
| embeddings = embeddings.reshape(embeddings.shape[0], embeddings.shape[2]) | |
| logger.debug("Squeezed embeddings from 3D to 2D for Ward clustering") | |
| else: | |
| embeddings = embeddings.reshape(embeddings.shape[0], -1) | |
| logger.warning( | |
| "Flattened 3D embeddings to 2D for Ward clustering; verify embedding generation" | |
| ) | |
| elif embeddings.ndim != 2: | |
| raise ValueError(f"Embeddings must be 2D array, got ndim={embeddings.ndim}") | |
| dist_square = cdist(embeddings, embeddings, metric="euclidean") | |
| np.fill_diagonal(dist_square, 0) | |
| # Convert to condensed form for scipy linkage | |
| condensed_dist = self._compute_condensed_distance(embeddings) | |
| # Build hierarchical clustering tree (Ward's method) | |
| linkage_matrix = linkage(condensed_dist, method="ward") | |
| # Compute adaptive cutoff based on aggressiveness | |
| cutoff = self._compute_percentile_cutoff(dist_square, aggressiveness) | |
| # Cut tree to form flat clusters | |
| labels = fcluster(linkage_matrix, t=cutoff, criterion="distance") | |
| n_clusters = len(np.unique(labels)) | |
| logger.debug( | |
| f"Ward clustering: n={n}, cutoff={cutoff:.4f} → {n_clusters} clusters" | |
| ) | |
| selected_sentences: List[str] = [] | |
| selected_indices: List[int] = [] | |
| # Select most central sentence from each cluster | |
| for cluster_id in np.unique(labels): | |
| cluster_mask = labels == cluster_id | |
| cluster_indices = np.where(cluster_mask)[0] | |
| if len(cluster_indices) == 1: | |
| rel_idx = cluster_indices[0] | |
| selected_sentences.append(sentences[rel_idx]) | |
| selected_indices.append(original_indices[rel_idx]) | |
| continue | |
| # Compute centroid and find closest sentence | |
| cluster_embeddings = embeddings[cluster_indices] | |
| centroid = np.mean(cluster_embeddings, axis=0, keepdims=True) | |
| distances = cdist(cluster_embeddings, centroid, metric="euclidean").flatten() | |
| best_rel_idx = cluster_indices[np.argmin(distances)] | |
| selected_sentences.append(sentences[best_rel_idx]) | |
| selected_indices.append(original_indices[best_rel_idx]) | |
| # Sort by original index to preserve document order | |
| order = np.argsort(selected_indices) | |
| return ( | |
| [selected_sentences[i] for i in order], | |
| [selected_indices[i] for i in order], | |
| ) | |
| def compress( | |
| self, | |
| sentences: List[str], | |
| embeddings: np.ndarray, | |
| aggressiveness: Optional[float] = None, | |
| mode: Optional[str] = None, | |
| ) -> Tuple[List[str], Dict[str, any]]: | |
| """ | |
| Compress sentences by clustering semantically similar ones. | |
| Main entry point for semantic compression. Preserves protected placeholders | |
| (__PROT_*) and applies adaptive clustering to compressible content. | |
| Parameters | |
| ---------- | |
| sentences : List[str] | |
| List of sentences to compress (may include protected placeholders). | |
| embeddings : np.ndarray | |
| Array of shape (len(sentences), embedding_dim) with precomputed embeddings. | |
| aggressiveness : Optional[float], optional | |
| Override compression intensity for this call. If None, uses instance default. | |
| mode : Optional[str], optional | |
| Override domain mode for this call. Affects aggressiveness preset. | |
| Returns | |
| ------- | |
| Tuple[List[str], Dict[str, any]] | |
| - Compressed list of sentences (protected + representatives) | |
| - Statistics dictionary with: | |
| * original_count: input sentence count | |
| * compressed_count: output sentence count | |
| * compression_ratio: 1 - (compressed/original) | |
| * discarded_low_variance: count removed by variance filter | |
| * aggressiveness_used: effective aggressiveness value | |
| * duration_seconds: processing time | |
| * cluster_method: "ward" or "kmeans" | |
| * compressed_indices: original indices of kept sentences | |
| Raises | |
| ------ | |
| ValueError | |
| If len(sentences) != embeddings.shape[0]. | |
| Pipeline Overview | |
| ----------------- | |
| 1. Validate inputs and resolve effective aggressiveness | |
| 2. Separate protected (__PROT_*) from compressible sentences | |
| 3. Filter low-variance embeddings (noise removal) | |
| 4. Auto-select clustering method based on sentence count | |
| 5. Cluster and select representatives | |
| 6. Merge protected + compressed, preserving original order | |
| 7. Compute and return statistics | |
| Complexity | |
| ---------- | |
| Overall: | |
| * Small n (<200): O(n²·d) dominated by Ward clustering | |
| * Large n (≥200): O(n·k·d·i) dominated by K-Means | |
| where n=sentences, d=embedding_dim, k=clusters, i=iterations | |
| Space: O(n²) for Ward distance matrix, O(n·d) for K-Means | |
| Example | |
| ------- | |
| >>> compressor = SemanticCompressor(mode="code") | |
| >>> compressed, stats = compressor.compress(sentences, embeddings) | |
| >>> print(f"Reduced {stats['original_count']} → {stats['compressed_count']} " | |
| ... f"({stats['compression_ratio']:.1%} savings)") | |
| """ | |
| # Validate input dimensions | |
| if len(sentences) != embeddings.shape[0]: | |
| raise ValueError( | |
| f"Mismatch: {len(sentences)} sentences vs " | |
| f"{embeddings.shape[0]} embeddings" | |
| ) | |
| if not sentences: | |
| return [], { | |
| "original_count": 0, | |
| "compressed_count": 0, | |
| "compression_ratio": 0.0, | |
| } | |
| # Resolve effective aggressiveness (call-time override > mode preset > instance default) | |
| if aggressiveness is not None: | |
| effective_agg = aggressiveness | |
| elif mode is not None and mode in self._MODE_AGGRESSIVENESS: | |
| effective_agg = self._MODE_AGGRESSIVENESS[mode] | |
| else: | |
| effective_agg = self.aggressiveness | |
| logger.debug(f"Effective aggressiveness: {effective_agg:.2f}") | |
| start_time = time.time() | |
| original_count = len(sentences) | |
| # Step 0: Separate protected placeholders from compressible content | |
| protected_sentences: List[str] = [] | |
| protected_indices: List[int] = [] | |
| normal_sentences: List[str] = [] | |
| normal_embeddings: List[np.ndarray] = [] | |
| normal_original_indices: List[int] = [] | |
| for i, sent in enumerate(sentences): | |
| if sent.startswith("__PROT_"): | |
| # Protected content: never compress, always preserve | |
| protected_sentences.append(sent) | |
| protected_indices.append(i) | |
| else: | |
| normal_sentences.append(sent) | |
| normal_embeddings.append(embeddings[i]) | |
| normal_original_indices.append(i) | |
| # Edge case: all content is protected → no compression possible | |
| if not normal_sentences: | |
| return sentences, { | |
| "original_count": original_count, | |
| "compressed_count": original_count, | |
| "compression_ratio": 0.0, | |
| "discarded_low_variance": 0, | |
| "aggressiveness_used": effective_agg, | |
| "duration_seconds": time.time() - start_time, | |
| } | |
| # Step 1: Filter low-variance (low-information) sentences | |
| normal_embeddings_arr = np.array(normal_embeddings) | |
| filtered_sentences, filtered_embeddings, kept_local_indices = self._remove_low_variance( | |
| normal_sentences, normal_embeddings_arr | |
| ) | |
| filtered_original_indices = [normal_original_indices[i] for i in kept_local_indices] | |
| # Edge case: all normal sentences filtered out | |
| if not filtered_sentences: | |
| result_sentences = protected_sentences | |
| return result_sentences, { | |
| "original_count": original_count, | |
| "compressed_count": len(result_sentences), | |
| "compression_ratio": 1.0 - (len(result_sentences) / original_count), | |
| "discarded_low_variance": len(normal_sentences), | |
| "aggressiveness_used": effective_agg, | |
| "duration_seconds": time.time() - start_time, | |
| } | |
| # Step 2: Auto-select clustering method based on sentence count | |
| clustering_method = self.method | |
| if clustering_method is None: | |
| if len(filtered_sentences) >= self.auto_method_threshold: | |
| clustering_method = "kmeans" | |
| logger.debug(f"Auto-selected K-Means (n={len(filtered_sentences)} >= threshold)") | |
| else: | |
| clustering_method = "ward" | |
| logger.debug(f"Auto-selected Ward (n={len(filtered_sentences)} < threshold)") | |
| logger.info(f"Clustering method: {clustering_method} ({len(filtered_sentences)} sentences)") | |
| # Step 3: Perform clustering and select representatives | |
| if clustering_method == "kmeans": | |
| compressed_normal, compressed_indices = self._cluster_kmeans( | |
| filtered_sentences, | |
| filtered_embeddings, | |
| effective_agg, | |
| filtered_original_indices, | |
| ) | |
| else: # ward | |
| compressed_normal, compressed_indices = self._cluster_ward( | |
| filtered_sentences, | |
| filtered_embeddings, | |
| effective_agg, | |
| filtered_original_indices, | |
| ) | |
| # Step 4: Merge protected + compressed, preserving original order | |
| # Use index mapping to handle potential collisions | |
| final_map: Dict[int, str] = {} | |
| # First, add protected sentences at their original positions | |
| for idx, sent in zip(protected_indices, protected_sentences): | |
| final_map[idx] = sent | |
| # Then, add compressed sentences, shifting index if collision | |
| for sent, idx in zip(compressed_normal, compressed_indices): | |
| while idx in final_map: | |
| idx += 1 # Linear probing for next available slot | |
| final_map[idx] = sent | |
| # Sort by index and extract final sentence list | |
| sorted_indices = sorted(final_map.keys()) | |
| compressed_sentences = [final_map[i] for i in sorted_indices] | |
| # Step 5: Compute statistics | |
| compressed_count = len(compressed_sentences) | |
| compression_ratio = 1.0 - (compressed_count / original_count) if original_count > 0 else 0.0 | |
| discarded_by_variance = original_count - len(filtered_sentences) - len(protected_sentences) | |
| stats = { | |
| "original_count": original_count, | |
| "compressed_count": compressed_count, | |
| "compression_ratio": compression_ratio, | |
| "discarded_low_variance": discarded_by_variance, | |
| "aggressiveness_used": effective_agg, | |
| "duration_seconds": time.time() - start_time, | |
| "cluster_method": clustering_method, | |
| "compressed_indices": sorted_indices, # Original indices of kept sentences | |
| } | |
| logger.info( | |
| f"Compression complete: {original_count} → {compressed_count} " | |
| f"({compression_ratio:.1%} reduction) in {stats['duration_seconds']:.3f}s" | |
| ) | |
| return compressed_sentences, stats | |
| async def compress_async( | |
| self, | |
| sentences: List[str], | |
| embeddings: np.ndarray, | |
| aggressiveness: Optional[float] = None, | |
| mode: Optional[str] = None, | |
| ) -> Tuple[List[str], Dict[str, any]]: | |
| """ | |
| Asynchronous version of compress (non-blocking event loop). | |
| Offloads CPU-bound compression to a worker thread via asyncio.to_thread, | |
| preventing event loop starvation in async applications. | |
| Parameters | |
| ---------- | |
| sentences : List[str] | |
| List of sentences to compress. | |
| embeddings : np.ndarray | |
| Precomputed embeddings array. | |
| aggressiveness : Optional[float], optional | |
| Override compression intensity. | |
| mode : Optional[str], optional | |
| Override domain mode. | |
| Returns | |
| ------- | |
| Tuple[List[str], Dict[str, any]] | |
| Compressed sentences and statistics (same as compress()). | |
| Note | |
| ---- | |
| - Does not provide true parallelism; uses thread pool for offloading | |
| - Suitable for high-concurrency async servers (FastAPI, etc.) | |
| - For true parallelism, use multiprocessing or distributed processing | |
| """ | |
| return await asyncio.to_thread( | |
| self.compress, sentences, embeddings, aggressiveness, mode | |
| ) | |