| |
| """ |
| Generate plots comparing compression methods. |
| """ |
|
|
| import json |
| import matplotlib.pyplot as plt |
| import matplotlib |
| import seaborn as sns |
| import numpy as np |
| import pandas as pd |
| from pathlib import Path |
|
|
| |
| matplotlib.use('Agg') |
|
|
| |
| plt.style.use('default') |
| sns.set_palette("husl") |
|
|
|
|
| def load_results(filename='compression_results.json'): |
| """Load compression results from JSON file.""" |
| try: |
| with open(filename, 'r') as f: |
| return json.load(f) |
| except FileNotFoundError: |
| print(f"Results file {filename} not found. Run 'just run' first to generate results.") |
| return None |
|
|
|
|
| def create_comparison_dataframe(results): |
| """Convert results to pandas DataFrame for easier plotting.""" |
| rows = [] |
| |
| for result in results: |
| name = result['name'] |
| original_size = result['original_size'] |
| theoretical_min = result['theoretical_minimum'] |
| vocab_size = result['vocabulary_size'] |
| |
| |
| methods = result['methods'] |
| |
| |
| if 'uniform' in name: |
| dataset_type = 'Uniform' |
| elif 'zipf' in name: |
| dataset_type = 'Zipf' |
| elif 'geometric' in name: |
| dataset_type = 'Geometric' |
| elif 'english' in name: |
| dataset_type = 'English Text' |
| else: |
| dataset_type = 'Other' |
| |
| row_base = { |
| 'dataset': name, |
| 'original_size': original_size, |
| 'theoretical_minimum': theoretical_min, |
| 'vocabulary_size': vocab_size, |
| 'dataset_type': dataset_type |
| } |
| |
| |
| for method_name, method_data in methods.items(): |
| row = row_base.copy() |
| row['method'] = method_name |
| |
| |
| if method_name.startswith('equiprobable_k'): |
| row['method_type'] = 'Equiprobable' |
| row['k_value'] = int(method_name.split('k')[1]) |
| elif method_name == 'enumerative': |
| row['method_type'] = 'Enumerative' |
| row['k_value'] = None |
| elif method_name == 'huffman': |
| row['method_type'] = 'Huffman' |
| row['k_value'] = None |
| |
| if method_data is None or method_data.get('compressed_size') is None: |
| |
| row['compressed_size'] = None |
| row['compression_ratio'] = None |
| row['bits_per_symbol'] = None |
| row['correct'] = False |
| row['encoding_time'] = method_data.get('encoding_time', 0) if method_data else 0 |
| row['status'] = 'timeout' if method_data and method_data.get('timed_out') else 'failed' |
| else: |
| |
| row['compressed_size'] = method_data['compressed_size'] |
| row['compression_ratio'] = method_data['compression_ratio'] |
| row['bits_per_symbol'] = method_data['bits_per_symbol'] |
| row['correct'] = method_data['correct'] |
| row['encoding_time'] = method_data.get('encoding_time', 0) |
| row['status'] = 'success' |
| |
| rows.append(row) |
| |
| |
| row = row_base.copy() |
| row['method'] = 'theoretical' |
| row['method_type'] = 'Theoretical' |
| row['compressed_size'] = theoretical_min |
| row['compression_ratio'] = original_size / theoretical_min |
| row['bits_per_symbol'] = theoretical_min * 8 / original_size |
| row['correct'] = True |
| row['k_value'] = None |
| row['status'] = 'success' |
| rows.append(row) |
| |
| return pd.DataFrame(rows) |
|
|
|
|
| def plot_compression_ratios(df, save_path='plots'): |
| """Plot compression ratios for different methods.""" |
| Path(save_path).mkdir(exist_ok=True) |
| |
| |
| fig, axes = plt.subplots(2, 2, figsize=(15, 12)) |
| fig.suptitle('Compression Performance Comparison', fontsize=16, fontweight='bold') |
| |
| |
| ax1 = axes[0, 0] |
| |
| |
| datasets = sorted(df['dataset'].unique()) |
| |
| |
| pivot_data = df.pivot(index='dataset', columns='method', values='compression_ratio') |
| |
| |
| key_methods = ['theoretical', 'huffman', 'enumerative'] |
| available_methods = [col for col in key_methods if col in pivot_data.columns] |
| pivot_subset = pivot_data[available_methods] |
| |
| |
| bars = pivot_subset.plot(kind='bar', ax=ax1, width=0.8) |
| ax1.set_title('Compression Ratio by Dataset') |
| ax1.set_xlabel('Dataset') |
| ax1.set_ylabel('Compression Ratio') |
| ax1.legend(bbox_to_anchor=(1.05, 1), loc='upper left') |
| ax1.tick_params(axis='x', rotation=45) |
| |
| |
| for i, dataset in enumerate(datasets): |
| enum_data = df[(df['dataset'] == dataset) & (df['method'] == 'enumerative')] |
| if not enum_data.empty and enum_data.iloc[0]['status'] == 'timeout': |
| ax1.text(i, ax1.get_ylim()[1] * 0.9, 'TIMEOUT', |
| ha='center', va='center', fontsize=8, |
| bbox=dict(boxstyle="round,pad=0.3", facecolor="red", alpha=0.7)) |
| |
| |
| ax2 = axes[0, 1] |
| pivot_bits = df.pivot(index='dataset', columns='method', values='bits_per_symbol') |
| pivot_bits_subset = pivot_bits[available_methods] |
| |
| pivot_bits_subset.plot(kind='bar', ax=ax2, width=0.8) |
| ax2.set_title('Bits per Symbol by Dataset') |
| ax2.set_xlabel('Dataset') |
| ax2.set_ylabel('Bits per Symbol') |
| ax2.legend(bbox_to_anchor=(1.05, 1), loc='upper left') |
| ax2.tick_params(axis='x', rotation=45) |
| |
| |
| for i, dataset in enumerate(datasets): |
| enum_data = df[(df['dataset'] == dataset) & (df['method'] == 'enumerative')] |
| if not enum_data.empty and enum_data.iloc[0]['status'] == 'timeout': |
| ax2.text(i, ax2.get_ylim()[1] * 0.9, 'TIMEOUT', |
| ha='center', va='center', fontsize=8, |
| bbox=dict(boxstyle="round,pad=0.3", facecolor="red", alpha=0.7)) |
| |
| |
| ax3 = axes[1, 0] |
| enum_data = df[df['method'] == 'enumerative'].copy() |
| |
| if not enum_data.empty: |
| |
| successful_enum = enum_data[enum_data['status'] == 'success'] |
| timeout_enum = enum_data[enum_data['status'] == 'timeout'] |
| |
| |
| if not successful_enum.empty: |
| ax3.scatter(successful_enum['original_size'], successful_enum['encoding_time'], |
| c='green', marker='o', s=60, alpha=0.7, label='Successful') |
| |
| |
| if not timeout_enum.empty: |
| ax3.scatter(timeout_enum['original_size'], timeout_enum['encoding_time'], |
| c='red', marker='X', s=80, alpha=0.9, label='Timeout') |
| |
| ax3.set_title('Enumerative Encoding Time vs Dataset Size') |
| ax3.set_xlabel('Dataset Size (symbols)') |
| ax3.set_ylabel('Encoding Time (seconds)') |
| ax3.set_xscale('log') |
| ax3.set_yscale('log') |
| ax3.grid(True, alpha=0.3) |
| ax3.legend() |
| |
| |
| if len(successful_enum) > 1: |
| x_vals = successful_enum['original_size'].values |
| y_vals = successful_enum['encoding_time'].values |
| z = np.polyfit(np.log10(x_vals), np.log10(y_vals), 1) |
| p = np.poly1d(z) |
| x_trend = np.logspace(np.log10(min(x_vals)), np.log10(max(x_vals)), 100) |
| y_trend = 10 ** p(np.log10(x_trend)) |
| ax3.plot(x_trend, y_trend, 'b--', alpha=0.5, linewidth=1, |
| label=f'Trend (slope: {z[0]:.2f})') |
| ax3.legend() |
| else: |
| ax3.text(0.5, 0.5, 'No enumerative data available', |
| ha='center', va='center', transform=ax3.transAxes) |
| ax3.set_title('Enumerative Encoding Time vs Dataset Size') |
| |
| |
| ax4 = axes[1, 1] |
| |
| |
| theoretical_data = df[df['method'] == 'theoretical'].set_index('dataset')['compressed_size'] |
| |
| for method in ['huffman', 'enumerative']: |
| if method in df['method'].values: |
| |
| method_df = df[df['method'] == method].set_index('dataset') |
| |
| |
| successful_data = method_df[method_df['compressed_size'].notna()] |
| if not successful_data.empty: |
| efficiency = theoretical_data / successful_data['compressed_size'] |
| |
| |
| common_datasets = efficiency.dropna().index |
| dataset_indices = [datasets.index(d) for d in common_datasets if d in datasets] |
| efficiency_values = [efficiency[datasets[i]] for i in dataset_indices] |
| |
| ax4.plot(dataset_indices, efficiency_values, marker='o', label=method, linewidth=2) |
| |
| |
| if method == 'enumerative': |
| failed_data = method_df[method_df['compressed_size'].isna()] |
| if not failed_data.empty: |
| failed_indices = [datasets.index(d) for d in failed_data.index if d in datasets] |
| ax4.scatter(failed_indices, [0.1] * len(failed_indices), |
| marker='X', s=100, color='red', label=f'{method} (timeout)', zorder=5) |
| |
| ax4.set_title('Efficiency vs Theoretical Minimum') |
| ax4.set_xlabel('Dataset Index') |
| ax4.set_ylabel('Efficiency (Theoretical/Actual)') |
| ax4.set_xticks(range(len(datasets))) |
| ax4.set_xticklabels([d[:15] + '...' if len(d) > 15 else d for d in datasets], rotation=45) |
| ax4.legend() |
| ax4.axhline(y=1.0, color='gray', linestyle='--', alpha=0.7, label='Perfect efficiency') |
| ax4.set_ylim(0, ax4.get_ylim()[1]) |
| |
| plt.tight_layout() |
| plt.savefig(f'{save_path}/compression_comparison.png', dpi=300, bbox_inches='tight') |
| plt.close(fig) |
|
|
|
|
| def plot_k_parameter_analysis(df, save_path='plots'): |
| """Analyze the effect of k parameter on EP performance.""" |
| Path(save_path).mkdir(exist_ok=True) |
| |
| |
| ep_data = df[df['method_type'] == 'Equiprobable'].copy() |
| |
| if ep_data.empty: |
| print("No equiprobable data found for k parameter analysis") |
| return |
| |
| fig, axes = plt.subplots(2, 2, figsize=(15, 12)) |
| fig.suptitle('Equiprobable Partitioning: Effect of k Parameter', fontsize=16, fontweight='bold') |
| |
| |
| ax1 = axes[0, 0] |
| |
| datasets_to_plot = ['small_uniform_10', 'medium_zipf_256', 'large_geometric_64', 'english_text'] |
| for dataset in datasets_to_plot: |
| if dataset in ep_data['dataset'].values: |
| dataset_data = ep_data[ep_data['dataset'] == dataset].sort_values('k_value') |
| ax1.plot(dataset_data['k_value'], dataset_data['compression_ratio'], |
| marker='o', label=dataset, linewidth=2) |
| |
| ax1.set_title('Compression Ratio vs k Parameter') |
| ax1.set_xlabel('k (Number of Partitions)') |
| ax1.set_ylabel('Compression Ratio') |
| ax1.legend() |
| ax1.grid(True, alpha=0.3) |
| |
| |
| ax2 = axes[0, 1] |
| |
| for dataset in datasets_to_plot: |
| if dataset in ep_data['dataset'].values: |
| dataset_data = ep_data[ep_data['dataset'] == dataset].sort_values('k_value') |
| ax2.plot(dataset_data['k_value'], dataset_data['bits_per_symbol'], |
| marker='s', label=dataset, linewidth=2) |
| |
| ax2.set_title('Bits per Symbol vs k Parameter') |
| ax2.set_xlabel('k (Number of Partitions)') |
| ax2.set_ylabel('Bits per Symbol') |
| ax2.legend() |
| ax2.grid(True, alpha=0.3) |
| |
| |
| ax3 = axes[1, 0] |
| |
| |
| optimal_k = {} |
| for dataset in ep_data['dataset'].unique(): |
| dataset_data = ep_data[ep_data['dataset'] == dataset] |
| if len(dataset_data) > 0: |
| best_idx = dataset_data['compression_ratio'].idxmax() |
| optimal_k[dataset] = dataset_data.loc[best_idx, 'k_value'] |
| |
| if optimal_k: |
| datasets = list(optimal_k.keys()) |
| k_values = list(optimal_k.values()) |
| |
| colors = ['red' if 'uniform' in d else 'blue' if 'zipf' in d else 'green' if 'geometric' in d else 'orange' |
| for d in datasets] |
| |
| bars = ax3.bar(range(len(datasets)), k_values, color=colors, alpha=0.7) |
| ax3.set_title('Optimal k Value by Dataset') |
| ax3.set_xlabel('Dataset') |
| ax3.set_ylabel('Optimal k') |
| ax3.set_xticks(range(len(datasets))) |
| ax3.set_xticklabels([d[:15] + '...' if len(d) > 15 else d for d in datasets], rotation=45) |
| |
| |
| for bar, k_val in zip(bars, k_values): |
| ax3.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1, |
| str(int(k_val)), ha='center', va='bottom') |
| |
| |
| ax4 = axes[1, 1] |
| |
| for dataset in datasets_to_plot: |
| if dataset in ep_data['dataset'].values: |
| dataset_data = ep_data[ep_data['dataset'] == dataset].sort_values('k_value') |
| if len(dataset_data) >= 2: |
| baseline = dataset_data[dataset_data['k_value'] == 2]['compression_ratio'].iloc[0] |
| improvement = (dataset_data['compression_ratio'] / baseline - 1) * 100 |
| ax4.plot(dataset_data['k_value'], improvement, |
| marker='^', label=dataset, linewidth=2) |
| |
| ax4.set_title('Performance Improvement over k=2 (%)') |
| ax4.set_xlabel('k (Number of Partitions)') |
| ax4.set_ylabel('Improvement (%)') |
| ax4.legend() |
| ax4.grid(True, alpha=0.3) |
| ax4.axhline(y=0, color='black', linestyle='-', alpha=0.5) |
| |
| plt.tight_layout() |
| plt.savefig(f'{save_path}/k_parameter_analysis.png', dpi=300, bbox_inches='tight') |
| plt.close(fig) |
|
|
|
|
| def plot_distribution_comparison(df, save_path='plots'): |
| """Compare performance across different data distributions.""" |
| Path(save_path).mkdir(exist_ok=True) |
| |
| |
| def get_distribution(name): |
| if 'uniform' in name: |
| return 'Uniform' |
| elif 'zipf' in name: |
| return 'Zipf' |
| elif 'geometric' in name: |
| return 'Geometric' |
| elif 'english' in name: |
| return 'Natural Text' |
| else: |
| return 'Other' |
| |
| df['distribution'] = df['dataset'].apply(get_distribution) |
| |
| |
| df_plot = df[df['correct'] | (df['method'] == 'theoretical')].copy() |
| |
| fig, axes = plt.subplots(2, 2, figsize=(15, 12)) |
| fig.suptitle('Performance by Data Distribution', fontsize=16, fontweight='bold') |
| |
| |
| ax1 = axes[0, 0] |
| |
| methods_to_plot = ['huffman', 'enumerative'] |
| plot_data = df_plot[df_plot['method'].isin(methods_to_plot)] |
| |
| if not plot_data.empty: |
| sns.boxplot(data=plot_data, x='distribution', y='compression_ratio', hue='method', ax=ax1) |
| ax1.set_title('Compression Ratio Distribution by Data Type') |
| ax1.set_xlabel('Data Distribution') |
| ax1.set_ylabel('Compression Ratio') |
| ax1.legend(bbox_to_anchor=(1.05, 1), loc='upper left') |
| |
| |
| ax2 = axes[0, 1] |
| |
| |
| huffman_data = df_plot[df_plot['method'] == 'huffman'].set_index('dataset')['compression_ratio'] |
| enum_data = df_plot[df_plot['method'] == 'enumerative'].set_index('dataset') |
| |
| if not enum_data.empty and not huffman_data.empty: |
| |
| common_datasets = set(huffman_data.index) & set(enum_data.index) |
| |
| if common_datasets: |
| distribution_ratios = {} |
| |
| for dataset in common_datasets: |
| enum_ratio = enum_data.loc[dataset, 'compression_ratio'] |
| huffman_ratio = huffman_data.loc[dataset] |
| relative_efficiency = enum_ratio / huffman_ratio |
| |
| |
| dist_type = df_plot[df_plot['dataset'] == dataset]['distribution'].iloc[0] |
| if dist_type not in distribution_ratios: |
| distribution_ratios[dist_type] = [] |
| distribution_ratios[dist_type].append(relative_efficiency) |
| |
| |
| if distribution_ratios: |
| distributions = list(distribution_ratios.keys()) |
| ratios = [distribution_ratios[dist] for dist in distributions] |
| |
| bp = ax2.boxplot(ratios, tick_labels=distributions, patch_artist=True) |
| |
| |
| colors = ['lightblue', 'lightgreen', 'lightcoral', 'lightyellow'] |
| for patch, color in zip(bp['boxes'], colors[:len(bp['boxes'])]): |
| patch.set_facecolor(color) |
| patch.set_alpha(0.7) |
| |
| ax2.set_title('Enumerative Efficiency Relative to Huffman') |
| ax2.set_xlabel('Data Distribution') |
| ax2.set_ylabel('Enumerative Ratio / Huffman Ratio') |
| ax2.axhline(y=1.0, color='red', linestyle='--', alpha=0.7, label='Equal to Huffman') |
| ax2.legend() |
| ax2.grid(True, alpha=0.3) |
| |
| |
| ax3 = axes[1, 0] |
| |
| |
| vocab_data = df_plot[df_plot['method'].isin(['huffman', 'enumerative'])] |
| |
| for method in ['huffman', 'enumerative']: |
| method_subset = vocab_data[vocab_data['method'] == method] |
| if not method_subset.empty: |
| ax3.scatter(method_subset['vocabulary_size'], method_subset['compression_ratio'], |
| label=method, alpha=0.7, s=60) |
| |
| ax3.set_title('Compression vs Vocabulary Size') |
| ax3.set_xlabel('Vocabulary Size') |
| ax3.set_ylabel('Compression Ratio') |
| ax3.set_xscale('log') |
| ax3.legend() |
| ax3.grid(True, alpha=0.3) |
| |
| |
| ax4 = axes[1, 1] |
| |
| for method in ['huffman', 'enumerative']: |
| method_subset = df_plot[df_plot['method'] == method] |
| if not method_subset.empty: |
| ax4.scatter(method_subset['original_size'], method_subset['compression_ratio'], |
| label=method, alpha=0.7, s=60) |
| |
| ax4.set_title('Compression vs Dataset Size') |
| ax4.set_xlabel('Original Size (bytes)') |
| ax4.set_ylabel('Compression Ratio') |
| ax4.set_xscale('log') |
| ax4.legend() |
| ax4.grid(True, alpha=0.3) |
| |
| plt.tight_layout() |
| plt.savefig(f'{save_path}/distribution_comparison.png', dpi=300, bbox_inches='tight') |
| plt.close(fig) |
|
|
|
|
| def generate_summary_table(df): |
| """Generate a summary table of results.""" |
| print("\n" + "="*130) |
| print("DETAILED COMPRESSION ANALYSIS") |
| print("="*130) |
| |
| methods_order = ['theoretical', 'huffman', 'enumerative'] |
| |
| print(f"{'Dataset':<25} {'Method':<15} {'Size':<8} {'Ratio':<7} {'Bits/Sym':<8} {'Efficiency':<10} {'Time':<8}") |
| print("-" * 130) |
| |
| for dataset in sorted(df['dataset'].unique()): |
| dataset_data = df[df['dataset'] == dataset] |
| theoretical_data = dataset_data[dataset_data['method'] == 'theoretical'] |
| |
| if not theoretical_data.empty: |
| theoretical_ratio = theoretical_data['compression_ratio'].iloc[0] |
| |
| for method in methods_order: |
| method_data = dataset_data[dataset_data['method'] == method] |
| if not method_data.empty: |
| row = method_data.iloc[0] |
| |
| if row['compressed_size'] is not None: |
| |
| efficiency = row['compression_ratio'] / theoretical_ratio |
| time_str = f"{row.get('encoding_time', 0):.3f}s" if 'encoding_time' in row else "N/A" |
| |
| print(f"{dataset:<25} {method:<15} {row['compressed_size']:<8.0f} " |
| f"{row['compression_ratio']:<7.2f} {row['bits_per_symbol']:<8.2f} " |
| f"{efficiency:<10.3f} {time_str:<8}") |
| else: |
| |
| time_str = f"{row.get('encoding_time', 0):.1f}s" if 'encoding_time' in row else "N/A" |
| status = "TIMEOUT" if row.get('status') == 'timeout' else "FAILED" |
| |
| print(f"{dataset:<25} {method:<15} {status:<8} {'N/A':<7} {'N/A':<8} " |
| f"{'N/A':<10} {time_str:<8}") |
| |
| print("-" * 130) |
|
|
|
|
| def plot_enumerative_timeout_analysis(df, save_path='plots'): |
| """Plot analysis focusing only on enumerative encoding times and timeouts.""" |
| Path(save_path).mkdir(exist_ok=True) |
| |
| |
| enum_df = df[df['method'] == 'enumerative'].copy() |
| |
| if enum_df.empty: |
| print("No enumerative data found for timeout analysis") |
| return |
| |
| fig, ax = plt.subplots(1, 1, figsize=(12, 8)) |
| fig.suptitle('Enumerative Encoding: Computation Time and Timeouts', |
| fontsize=14, fontweight='bold') |
| |
| |
| enum_stats = [] |
| for _, row in enum_df.iterrows(): |
| dataset_name = row['dataset'] |
| vocab_size = row['vocabulary_size'] |
| original_size = row['original_size'] |
| |
| |
| if 'uniform' in dataset_name: |
| dataset_type = 'Uniform' |
| color = 'blue' |
| marker = 'o' |
| elif 'zipf' in dataset_name: |
| dataset_type = 'Zipf' |
| color = 'red' |
| marker = 's' |
| elif 'geometric' in dataset_name: |
| dataset_type = 'Geometric' |
| color = 'green' |
| marker = '^' |
| elif 'english' in dataset_name: |
| dataset_type = 'English Text' |
| color = 'purple' |
| marker = 'D' |
| else: |
| dataset_type = 'Other' |
| color = 'gray' |
| marker = 'x' |
| |
| |
| timed_out = row['status'] == 'timeout' |
| encoding_time = row.get('encoding_time', 0) |
| |
| enum_stats.append({ |
| 'dataset': dataset_name, |
| 'vocab_size': vocab_size, |
| 'original_size': original_size, |
| 'dataset_type': dataset_type, |
| 'color': color, |
| 'marker': marker, |
| 'timed_out': timed_out, |
| 'encoding_time': encoding_time |
| }) |
| |
| if enum_stats: |
| stats_df = pd.DataFrame(enum_stats) |
| |
| |
| successful_data = stats_df[~stats_df['timed_out']] |
| timeout_data = stats_df[stats_df['timed_out']] |
| |
| |
| scatter_success = None |
| for dataset_type in successful_data['dataset_type'].unique(): |
| type_data = successful_data[successful_data['dataset_type'] == dataset_type] |
| |
| if not type_data.empty: |
| |
| times_log = np.log10(np.maximum(type_data['encoding_time'].values, 0.001)) |
| |
| scatter = ax.scatter(type_data['vocab_size'], type_data['original_size'], |
| c=times_log, cmap='viridis', |
| marker=type_data['marker'].iloc[0], |
| s=100, alpha=0.8, edgecolors='black', linewidth=0.5, |
| label=f'{dataset_type}') |
| |
| if scatter_success is None: |
| scatter_success = scatter |
| |
| |
| if not timeout_data.empty: |
| ax.scatter(timeout_data['vocab_size'], timeout_data['original_size'], |
| color='red', marker='X', s=150, alpha=0.9, |
| edgecolors='darkred', linewidth=1, |
| label='Timeout') |
| |
| |
| if scatter_success is not None: |
| cbar = plt.colorbar(scatter_success, ax=ax) |
| cbar.set_label('log₁₀(Encoding Time in seconds)') |
| |
| ax.set_xlabel('Vocabulary Size') |
| ax.set_ylabel('Dataset Size (symbols)') |
| ax.set_xscale('log') |
| ax.set_yscale('log') |
| ax.grid(True, alpha=0.3) |
| |
| |
| ax.legend(bbox_to_anchor=(0.5, -0.15), loc='upper center', ncol=3) |
| |
| |
| for _, row in stats_df.iterrows(): |
| if row['timed_out']: |
| time_label = f"TO:{row['encoding_time']:.1f}s" |
| else: |
| time_label = f"{row['encoding_time']:.2f}s" |
| |
| ax.annotate(time_label, |
| (row['vocab_size'], row['original_size']), |
| xytext=(5, 5), textcoords='offset points', |
| fontsize=8, alpha=0.8) |
| |
| plt.tight_layout() |
| plt.savefig(f'{save_path}/enumerative_timeout_analysis.png', dpi=300, bbox_inches='tight') |
| plt.close(fig) |
| |
| |
| print("\nEnumerative Encoding Performance Summary:") |
| print("=" * 50) |
| |
| enum_success = enum_df[enum_df['status'] == 'success'] |
| enum_timeout = enum_df[enum_df['status'] == 'timeout'] |
| |
| print(f"Successful encodings: {len(enum_success)}") |
| print(f"Timed out encodings: {len(enum_timeout)}") |
| |
| if not enum_success.empty: |
| avg_time = enum_success['encoding_time'].mean() |
| max_time = enum_success['encoding_time'].max() |
| min_time = enum_success['encoding_time'].min() |
| print(f"Encoding time stats (successful): min={min_time:.3f}s, avg={avg_time:.3f}s, max={max_time:.3f}s") |
| |
| if not enum_timeout.empty: |
| print("Datasets that timed out:") |
| for _, row in enum_timeout.iterrows(): |
| print(f" {row['dataset']}: vocab={row['vocabulary_size']}, size={row['original_size']}") |
| |
| print(f"Performance by dataset type:") |
| for dtype in enum_df['dataset_type'].unique(): |
| type_data = enum_df[enum_df['dataset_type'] == dtype] |
| success_rate = len(type_data[type_data['status'] == 'success']) / len(type_data) |
| print(f" {dtype}: {success_rate:.1%} success rate") |
|
|
|
|
| def plot_compression_time_comparison(df, save_path='plots'): |
| """Plot comparison of compression times between different algorithms.""" |
| Path(save_path).mkdir(exist_ok=True) |
| |
| |
| timing_data = df[df['encoding_time'].notna() & (df['encoding_time'] > 0)].copy() |
| |
| if timing_data.empty: |
| print("No timing data available for compression time comparison") |
| return |
| |
| fig, axes = plt.subplots(2, 2, figsize=(15, 12)) |
| fig.suptitle('Compression Time Comparison: Huffman vs Enumerative', fontsize=16, fontweight='bold') |
| |
| |
| ax1 = axes[0, 0] |
| |
| huffman_times = timing_data[timing_data['method'] == 'huffman'] |
| enum_times = timing_data[(timing_data['method'] == 'enumerative') & (timing_data['status'] == 'success')] |
| |
| if not huffman_times.empty and not enum_times.empty: |
| |
| common_datasets = set(huffman_times['dataset']) & set(enum_times['dataset']) |
| |
| if common_datasets: |
| huffman_common = huffman_times[huffman_times['dataset'].isin(common_datasets)].sort_values('dataset') |
| enum_common = enum_times[enum_times['dataset'].isin(common_datasets)].sort_values('dataset') |
| |
| x = np.arange(len(common_datasets)) |
| width = 0.35 |
| |
| ax1.bar(x - width/2, huffman_common['encoding_time'], width, |
| label='Huffman', alpha=0.8, color='blue') |
| ax1.bar(x + width/2, enum_common['encoding_time'], width, |
| label='Enumerative', alpha=0.8, color='green') |
| |
| ax1.set_title('Encoding Time by Dataset (Successful Cases)') |
| ax1.set_xlabel('Dataset') |
| ax1.set_ylabel('Encoding Time (seconds)') |
| ax1.set_yscale('log') |
| ax1.set_xticks(x) |
| ax1.set_xticklabels([d[:15] + '...' if len(d) > 15 else d for d in sorted(common_datasets)], rotation=45) |
| ax1.legend() |
| ax1.grid(True, alpha=0.3) |
| |
| |
| ax2 = axes[0, 1] |
| |
| for method in ['huffman', 'enumerative']: |
| method_data = timing_data[timing_data['method'] == method] |
| if not method_data.empty: |
| successful = method_data[method_data['status'] == 'success'] |
| if not successful.empty: |
| ax2.scatter(successful['original_size'], successful['encoding_time'], |
| label=f'{method} (success)', alpha=0.7, s=60) |
| |
| |
| if method == 'enumerative': |
| timeouts = method_data[method_data['status'] == 'timeout'] |
| if not timeouts.empty: |
| ax2.scatter(timeouts['original_size'], timeouts['encoding_time'], |
| label='enumerative (timeout)', alpha=0.9, s=80, marker='X', color='red') |
| |
| ax2.set_title('Encoding Time vs Dataset Size') |
| ax2.set_xlabel('Dataset Size (symbols)') |
| ax2.set_ylabel('Encoding Time (seconds)') |
| ax2.set_xscale('log') |
| ax2.set_yscale('log') |
| ax2.legend() |
| ax2.grid(True, alpha=0.3) |
| |
| |
| ax3 = axes[1, 0] |
| |
| if not huffman_times.empty and not enum_times.empty: |
| |
| huffman_dict = dict(zip(huffman_times['dataset'], huffman_times['encoding_time'])) |
| enum_successful = enum_times[enum_times['status'] == 'success'] |
| |
| ratios = [] |
| dataset_types = [] |
| vocab_sizes = [] |
| |
| for _, row in enum_successful.iterrows(): |
| dataset = row['dataset'] |
| if dataset in huffman_dict: |
| ratio = row['encoding_time'] / huffman_dict[dataset] |
| ratios.append(ratio) |
| dataset_types.append(row['dataset_type']) |
| vocab_sizes.append(row['vocabulary_size']) |
| |
| if ratios: |
| |
| colors = {'Uniform': 'blue', 'Zipf': 'red', 'Geometric': 'green', 'English Text': 'purple'} |
| type_colors = [colors.get(dt, 'gray') for dt in dataset_types] |
| |
| scatter = ax3.scatter(vocab_sizes, ratios, c=type_colors, alpha=0.7, s=80) |
| |
| |
| for dtype, color in colors.items(): |
| if dtype in dataset_types: |
| ax3.scatter([], [], c=color, label=dtype, alpha=0.7, s=80) |
| |
| ax3.set_title('Speed Ratio (Enumerative/Huffman) vs Vocabulary Size') |
| ax3.set_xlabel('Vocabulary Size') |
| ax3.set_ylabel('Time Ratio (Enum/Huffman)') |
| ax3.set_xscale('log') |
| ax3.set_yscale('log') |
| ax3.axhline(y=1.0, color='black', linestyle='--', alpha=0.5, label='Equal speed') |
| ax3.legend() |
| ax3.grid(True, alpha=0.3) |
| |
| |
| ax4 = axes[1, 1] |
| |
| huffman_successful = huffman_times[huffman_times['status'] == 'success']['encoding_time'] |
| enum_successful_times = enum_times[enum_times['status'] == 'success']['encoding_time'] |
| |
| time_data = [] |
| labels = [] |
| |
| if not huffman_successful.empty: |
| time_data.append(huffman_successful.values) |
| labels.append('Huffman') |
| |
| if not enum_successful_times.empty: |
| time_data.append(enum_successful_times.values) |
| labels.append('Enumerative') |
| |
| if time_data: |
| bp = ax4.boxplot(time_data, tick_labels=labels, patch_artist=True) |
| |
| |
| colors = ['lightblue', 'lightgreen'] |
| for patch, color in zip(bp['boxes'], colors[:len(bp['boxes'])]): |
| patch.set_facecolor(color) |
| patch.set_alpha(0.7) |
| |
| ax4.set_title('Encoding Time Distribution') |
| ax4.set_ylabel('Encoding Time (seconds)') |
| ax4.set_yscale('log') |
| ax4.grid(True, alpha=0.3) |
| |
| plt.tight_layout() |
| plt.savefig(f'{save_path}/compression_time_comparison.png', dpi=300, bbox_inches='tight') |
| plt.close(fig) |
| |
| |
| print("\nCompression Time Summary:") |
| print("=" * 50) |
| |
| if not huffman_times.empty: |
| huff_stats = huffman_times['encoding_time'] |
| print(f"Huffman encoding times:") |
| print(f" Min: {huff_stats.min():.6f}s, Avg: {huff_stats.mean():.6f}s, Max: {huff_stats.max():.6f}s") |
| |
| if not enum_successful_times.empty: |
| enum_stats = enum_successful_times |
| print(f"Enumerative encoding times (successful):") |
| print(f" Min: {enum_stats.min():.3f}s, Avg: {enum_stats.mean():.3f}s, Max: {enum_stats.max():.3f}s") |
| print(f" Speed vs Huffman: {enum_stats.mean() / huffman_times['encoding_time'].mean():.0f}x slower on average") |
|
|
|
|
| def main(): |
| """Generate all plots and analysis.""" |
| results = load_results() |
| if results is None: |
| return |
| |
| print("Loading compression results...") |
| df = create_comparison_dataframe(results) |
| |
| print("Generating plots...") |
| |
| |
| Path('plots').mkdir(exist_ok=True) |
| |
| |
| plot_compression_ratios(df) |
| plot_k_parameter_analysis(df) |
| plot_distribution_comparison(df) |
| plot_enumerative_timeout_analysis(df) |
| plot_compression_time_comparison(df) |
| |
| |
| generate_summary_table(df) |
| |
| print("\nPlots saved to 'plots/' directory") |
| print("Analysis complete!") |
|
|
|
|
| if __name__ == "__main__": |
| main() |