| |
|
| | """
|
| | Main processing orchestrator that ties together all the manufacturing priority logic.
|
| | This module provides high-level functions that both the CLI and Gradio interfaces can use.
|
| | """
|
| |
|
| | import os
|
| | import pandas as pd
|
| | from typing import Dict, List, Tuple, Optional
|
| | from datetime import datetime
|
| |
|
| | from config import WEIGHTS
|
| | from sheet_reader import list_sheets, read_sheet
|
| | from priority_logic import compute_priority
|
| | from output_writer import save_with_instructions
|
| | from utils import prompt_weights
|
| |
|
| |
|
| | class ManufacturingProcessor:
|
| | """
|
| | Main processor class for manufacturing priority calculations.
|
| | Encapsulates all the logic needed to process Excel files and generate priority rankings.
|
| | """
|
| |
|
| | def __init__(self, weights: Optional[Dict[str, int]] = None):
|
| | """Initialize processor with weights"""
|
| | self.weights = weights or WEIGHTS.copy()
|
| | self.validate_weights()
|
| |
|
| | def validate_weights(self) -> None:
|
| | """Ensure weights sum to 100"""
|
| | total = sum(self.weights.values())
|
| | if total != 100:
|
| | raise ValueError(f"Weights must sum to 100, got {total}")
|
| |
|
| | def get_file_info(self, file_path: str) -> Dict:
|
| | """Get information about the Excel file"""
|
| | if not os.path.exists(file_path):
|
| | raise FileNotFoundError(f"File not found: {file_path}")
|
| |
|
| | try:
|
| | sheets = list_sheets(file_path)
|
| | file_size = os.path.getsize(file_path)
|
| |
|
| | return {
|
| | "file_path": file_path,
|
| | "file_name": os.path.basename(file_path),
|
| | "file_size": file_size,
|
| | "sheets": sheets,
|
| | "sheet_count": len(sheets)
|
| | }
|
| | except Exception as e:
|
| | raise Exception(f"Error reading file info: {e}")
|
| |
|
| | def validate_sheet_data(self, df: pd.DataFrame) -> Dict:
|
| | """Validate that the sheet has required columns and data"""
|
| | from priority_logic import REQUIRED_COLS
|
| |
|
| |
|
| | df_norm = df.copy()
|
| | df_norm.columns = [str(c).strip() for c in df_norm.columns]
|
| |
|
| |
|
| | missing_cols = [col for col in REQUIRED_COLS if col not in df_norm.columns]
|
| |
|
| |
|
| | validation_result = {
|
| | "valid": len(missing_cols) == 0,
|
| | "missing_columns": missing_cols,
|
| | "available_columns": list(df.columns),
|
| | "row_count": len(df),
|
| | "empty_rows": df.isnull().all(axis=1).sum(),
|
| | "data_issues": []
|
| | }
|
| |
|
| | if validation_result["valid"]:
|
| |
|
| | try:
|
| |
|
| | date_col = "Oldest Product Required First"
|
| | date_issues = pd.to_datetime(df_norm[date_col], errors='coerce').isnull().sum()
|
| | if date_issues > 0:
|
| | validation_result["data_issues"].append(f"{date_issues} invalid dates in '{date_col}'")
|
| |
|
| |
|
| | qty_col = "Quantity of Each Component"
|
| | qty_numeric = pd.to_numeric(df_norm[qty_col], errors='coerce')
|
| | qty_issues = qty_numeric.isnull().sum()
|
| | if qty_issues > 0:
|
| | validation_result["data_issues"].append(f"{qty_issues} non-numeric values in '{qty_col}'")
|
| |
|
| |
|
| | for col in REQUIRED_COLS:
|
| | if col in df_norm.columns:
|
| | empty_count = df_norm[col].isnull().sum()
|
| | if empty_count == len(df_norm):
|
| | validation_result["data_issues"].append(f"Column '{col}' is completely empty")
|
| |
|
| | except Exception as e:
|
| | validation_result["data_issues"].append(f"Data validation error: {e}")
|
| |
|
| | return validation_result
|
| |
|
| | def process_file(self,
|
| | file_path: str,
|
| | sheet_name: str,
|
| | min_qty: int = 50,
|
| | custom_weights: Dict[str, int] = None) -> Tuple[pd.DataFrame, Dict]:
|
| | """
|
| | Process a single sheet from an Excel file and return prioritized results.
|
| |
|
| | Returns:
|
| | Tuple of (processed_dataframe, processing_info)
|
| | """
|
| |
|
| |
|
| | weights = custom_weights or self.weights
|
| | if custom_weights:
|
| | temp_weights = custom_weights.copy()
|
| | if sum(temp_weights.values()) != 100:
|
| | raise ValueError("Custom weights must sum to 100")
|
| | else:
|
| | temp_weights = weights
|
| |
|
| |
|
| | df = read_sheet(file_path, sheet_name)
|
| | if df is None or df.empty:
|
| | raise ValueError("Sheet is empty or could not be read")
|
| |
|
| |
|
| | validation = self.validate_sheet_data(df)
|
| | if not validation["valid"]:
|
| | raise ValueError(f"Data validation failed: Missing columns {validation['missing_columns']}")
|
| |
|
| |
|
| | try:
|
| | processed_df = compute_priority(df, min_qty=min_qty, weights=temp_weights)
|
| | except Exception as e:
|
| | raise Exception(f"Priority calculation failed: {e}")
|
| |
|
| |
|
| | processing_info = {
|
| | "timestamp": datetime.now().isoformat(),
|
| | "file_name": os.path.basename(file_path),
|
| | "sheet_name": sheet_name,
|
| | "weights_used": temp_weights,
|
| | "min_quantity": min_qty,
|
| | "total_products": len(df),
|
| | "products_above_threshold": sum(processed_df["QtyThresholdOK"]),
|
| | "highest_priority_score": processed_df["PriorityScore"].max(),
|
| | "lowest_priority_score": processed_df["PriorityScore"].min(),
|
| | "validation_info": validation
|
| | }
|
| |
|
| | return processed_df, processing_info
|
| |
|
| | def save_results(self,
|
| | processed_df: pd.DataFrame,
|
| | output_path: str,
|
| | processing_info: Dict) -> str:
|
| | """Save processed results with full documentation"""
|
| |
|
| | try:
|
| | save_with_instructions(
|
| | processed_df,
|
| | output_path,
|
| | min_qty=processing_info["min_quantity"],
|
| | weights=processing_info["weights_used"]
|
| | )
|
| |
|
| |
|
| | self._add_processing_log(output_path, processing_info)
|
| |
|
| | return output_path
|
| |
|
| | except Exception as e:
|
| | raise Exception(f"Failed to save results: {e}")
|
| |
|
| | def _add_processing_log(self, output_path: str, processing_info: Dict):
|
| | """Add a processing log sheet to the output file"""
|
| | try:
|
| |
|
| | with pd.ExcelWriter(output_path, mode='a', engine='openpyxl', if_sheet_exists='replace') as writer:
|
| | log_data = []
|
| | log_data.append(["PROCESSING LOG"])
|
| | log_data.append([""])
|
| | log_data.append(["Processing Timestamp", processing_info["timestamp"]])
|
| | log_data.append(["Source File", processing_info["file_name"]])
|
| | log_data.append(["Sheet Processed", processing_info["sheet_name"]])
|
| | log_data.append([""])
|
| | log_data.append(["SETTINGS USED"])
|
| | log_data.append(["Age Weight", f"{processing_info['weights_used']['AGE_WEIGHT']}%"])
|
| | log_data.append(["Component Weight", f"{processing_info['weights_used']['COMPONENT_WEIGHT']}%"])
|
| | log_data.append(["Manual Weight", f"{processing_info['weights_used']['MANUAL_WEIGHT']}%"])
|
| | log_data.append(["Minimum Quantity", processing_info["min_quantity"]])
|
| | log_data.append([""])
|
| | log_data.append(["RESULTS SUMMARY"])
|
| | log_data.append(["Total Products", processing_info["total_products"]])
|
| | log_data.append(["Above Threshold", processing_info["products_above_threshold"]])
|
| | log_data.append(["Highest Priority Score", f"{processing_info['highest_priority_score']:.4f}"])
|
| | log_data.append(["Lowest Priority Score", f"{processing_info['lowest_priority_score']:.4f}"])
|
| |
|
| | if processing_info["validation_info"]["data_issues"]:
|
| | log_data.append([""])
|
| | log_data.append(["DATA ISSUES FOUND"])
|
| | for issue in processing_info["validation_info"]["data_issues"]:
|
| | log_data.append(["", issue])
|
| |
|
| | log_df = pd.DataFrame(log_data, columns=["Parameter", "Value"])
|
| | log_df.to_excel(writer, sheet_name='Processing_Log', index=False)
|
| |
|
| | except Exception as e:
|
| |
|
| | print(f"Warning: Could not add processing log: {e}")
|
| |
|
| |
|
| |
|
| | def quick_process(file_path: str,
|
| | sheet_name: str,
|
| | output_path: str = None,
|
| | min_qty: int = 50,
|
| | weights: Optional[Dict[str, int]] = None) -> str:
|
| | """
|
| | Quick processing function that handles the full workflow.
|
| |
|
| | Args:
|
| | file_path: Path to Excel file
|
| | sheet_name: Name of sheet to process
|
| | output_path: Where to save results (optional, will auto-generate if not provided)
|
| | min_qty: Minimum quantity threshold
|
| | weights: Custom weights dict (optional)
|
| |
|
| | Returns:
|
| | Path to generated output file
|
| | """
|
| | processor = ManufacturingProcessor(weights)
|
| |
|
| |
|
| | processed_df, processing_info = processor.process_file(
|
| | file_path, sheet_name, min_qty, weights
|
| | )
|
| |
|
| |
|
| | if output_path is None:
|
| | base_name = os.path.splitext(os.path.basename(file_path))[0]
|
| | output_dir = os.path.dirname(file_path)
|
| | output_path = os.path.join(output_dir, f"{base_name}_PRIORITY.xlsx")
|
| |
|
| |
|
| | return processor.save_results(processed_df, output_path, processing_info)
|
| |
|
| |
|
| | def get_file_preview(file_path: str, sheet_name: str, max_rows: int = 5) -> Dict:
|
| | """
|
| | Get a preview of the file data for validation purposes.
|
| |
|
| | Returns:
|
| | Dict containing preview info and sample data
|
| | """
|
| | processor = ManufacturingProcessor()
|
| |
|
| |
|
| | file_info = processor.get_file_info(file_path)
|
| |
|
| |
|
| | df = read_sheet(file_path, sheet_name)
|
| | sample_df = df.head(max_rows) if df is not None else pd.DataFrame()
|
| |
|
| |
|
| | validation = processor.validate_sheet_data(df) if df is not None else {"valid": False}
|
| |
|
| | return {
|
| | "file_info": file_info,
|
| | "sample_data": sample_df,
|
| | "validation": validation,
|
| | "preview_rows": len(sample_df)
|
| | } |