| """ |
| OCR and portfolio parsing module. |
| |
| Handles: |
| - Text extraction from portfolio screenshots using Tesseract OCR |
| - Parsing tickers and amounts using regex |
| - JSON validation for user-edited portfolio data |
| - Image preprocessing for better OCR accuracy |
| """ |
|
|
| import re |
| import json |
| from typing import Dict, Tuple, Optional |
| from PIL import Image, ImageEnhance, ImageFilter |
| import pytesseract |
| import numpy as np |
|
|
|
|
| MAX_TICKERS = 100 |
|
|
|
|
| |
| TICKER_PATTERNS = [ |
| |
| r'([A-Z]{1,5})\s*[\$€£]?\s*([\d,]+\.?\d*)', |
| |
| r'[\$€£]?\s*([\d,]+\.?\d*)\s+([A-Z]{1,5})', |
| |
| r'([A-Z]{1,5})\s*\n\s*[\$€£]?\s*([\d,]+\.?\d*)', |
| |
| r'([A-Z]{1,5})\s*[:|]\s*[\$€£]?\s*([\d,]+\.?\d*)', |
| |
| |
| r'[\d,]+\.?\d*\s+([A-Z]{2,5})\s*[-–]\s*[\d,]+', |
| ] |
|
|
| |
| |
| REVOLUT_PATTERN = r'([\d,]+\.?\d*)\s*[\$€£]\s*\n.*?\s+([A-Z]{2,5})\s*[-–]' |
|
|
|
|
| def is_dark_theme(image: Image.Image) -> bool: |
| """ |
| Detect if image uses dark theme (dark background, light text). |
| |
| Args: |
| image: PIL Image object |
| |
| Returns: |
| True if dark theme detected, False otherwise |
| """ |
| |
| gray = image.convert('L') |
|
|
| |
| width, height = gray.size |
| sample_region = gray.crop(( |
| width // 4, |
| height // 4, |
| 3 * width // 4, |
| 3 * height // 4 |
| )) |
|
|
| |
| pixels = np.array(sample_region) |
| avg_brightness = np.mean(pixels) |
|
|
| |
| return avg_brightness < 128 |
|
|
|
|
| def preprocess_image(image: Image.Image) -> Image.Image: |
| """ |
| Preprocess image for better OCR accuracy. |
| |
| Applies: |
| - Dark theme detection and inversion if needed |
| - Grayscale conversion |
| - Contrast enhancement |
| - Sharpening |
| - Noise reduction |
| - Upscaling for small images |
| |
| Args: |
| image: PIL Image object |
| |
| Returns: |
| Preprocessed PIL Image object |
| """ |
| |
| if is_dark_theme(image): |
| |
| from PIL import ImageOps |
| image = ImageOps.invert(image.convert('RGB')).convert('L') |
| else: |
| |
| image = image.convert('L') |
|
|
| |
| enhancer = ImageEnhance.Contrast(image) |
| image = enhancer.enhance(2.0) |
|
|
| |
| image = image.filter(ImageFilter.SHARPEN) |
|
|
| |
| width, height = image.size |
| if width < 800 or height < 800: |
| scale = max(800 / width, 800 / height) |
| new_size = (int(width * scale), int(height * scale)) |
| image = image.resize(new_size, Image.Resampling.LANCZOS) |
|
|
| return image |
|
|
|
|
| def extract_text_from_image(image: Image.Image) -> Tuple[Optional[str], Optional[str]]: |
| """ |
| Extract text from uploaded portfolio screenshot using Tesseract OCR. |
| |
| Uses image preprocessing and custom Tesseract config for better accuracy. |
| |
| Args: |
| image: PIL Image object |
| |
| Returns: |
| Tuple of (extracted_text, error_message) |
| - If successful: (text, None) |
| - If failed: (None, error_message) |
| """ |
| try: |
| |
| pytesseract.get_tesseract_version() |
|
|
| |
| processed_image = preprocess_image(image) |
|
|
| |
| |
| |
| custom_config = r'--oem 3 --psm 6' |
|
|
| |
| text = pytesseract.image_to_string(processed_image, config=custom_config) |
|
|
| |
| if not text.strip(): |
| |
| custom_config = r'--oem 3 --psm 4' |
| text = pytesseract.image_to_string(processed_image, config=custom_config) |
|
|
| |
| if not text.strip(): |
| return None, "No text detected in image. Please upload a clearer screenshot or enter data manually." |
|
|
| return text, None |
|
|
| except pytesseract.TesseractNotFoundError: |
| return None, "OCR engine (Tesseract) not available. Please check installation." |
| except Exception as e: |
| return None, f"OCR failed: {str(e)}" |
|
|
|
|
| def parse_revolut_format(text: str) -> Dict[str, float]: |
| """ |
| Parse Revolut-specific format. |
| |
| Revolut format (typically 2 lines per stock): |
| Line 1: [icon] Company Name [portfolio_value]$ |
| Line 2: [shares] TICKER[separator] [price_per_share]$ [change%] |
| |
| Examples: |
| Line 1: "@ Micron Technology 3 212,85 $" |
| Line 2: "8,31 MU» 386,56 $ 4 109,73%" |
| |
| Handles variations: |
| - Spaces in numbers: "3 256,40" |
| - Different separators after ticker: "-", ":", "*", "»", "«" |
| - Numbers without decimals: "172312" |
| - Negative values in change column |
| |
| Args: |
| text: Extracted text from OCR |
| |
| Returns: |
| Dictionary mapping tickers to amounts |
| """ |
| portfolio = {} |
| lines = text.split('\n') |
|
|
| |
| i = 0 |
| while i < len(lines): |
| current_line = lines[i].strip() |
|
|
| |
| if not current_line: |
| i += 1 |
| continue |
|
|
| |
| |
| |
| is_ticker_line = re.match(r'^[\d,]+[.,]?\d*\s*[A-Z]{2,5}[\s\-–:*«»]', current_line) |
|
|
| if is_ticker_line: |
| |
| i += 1 |
| continue |
|
|
| |
| |
| |
| |
| value_match = re.search(r'(?<![\-–])([\d\s,]+(?:[.,]\d{1,2})?)[:\']?\s*[\$€£]', current_line) |
|
|
| if value_match: |
| portfolio_value_str = value_match.group(1) |
|
|
| |
| |
| clean_value = portfolio_value_str.replace(' ', '') |
|
|
| |
| |
| |
| if not re.search(r'[.,]', clean_value) and len(clean_value) > 2: |
| |
| clean_value = clean_value[:-2] + '.' + clean_value[-2:] |
| else: |
| |
| clean_value = clean_value.replace(',', '.') |
|
|
| try: |
| amount = float(clean_value) |
| |
| |
| if amount < 50: |
| i += 1 |
| continue |
| except ValueError: |
| i += 1 |
| continue |
|
|
| |
| ticker_found = False |
| for lookahead in range(1, 3): |
| if i + lookahead >= len(lines): |
| break |
|
|
| check_line = lines[i + lookahead].strip() |
|
|
| |
| |
| |
| |
| ticker_match = re.search(r'[\d,]+[.,]?\d*\s*([A-Z]{2,5})[\s\-–:*«»]', check_line) |
|
|
| if ticker_match: |
| ticker = ticker_match.group(1) |
|
|
| |
| if len(ticker) >= 2 and ticker not in ['AM', 'PM', 'USD', 'EUR', 'GBP', 'JPY', 'CHF']: |
| |
| if ticker not in portfolio: |
| portfolio[ticker] = amount |
| ticker_found = True |
| i += lookahead + 1 |
| break |
|
|
| if not ticker_found: |
| i += 1 |
| else: |
| i += 1 |
|
|
| return portfolio |
|
|
|
|
| def parse_portfolio(text: str) -> Dict[str, float]: |
| """ |
| Parse portfolio from extracted text using multiple regex patterns. |
| |
| Tries various patterns to handle different screenshot formats: |
| - Revolut format (priority) |
| - Ticker followed by amount: "AAPL 5000" or "AAPL $5,000.00" |
| - Amount followed by ticker: "$5,000 AAPL" |
| - Multi-line format: ticker on one line, amount on next |
| - With separators: "AAPL | $5,000.00" |
| |
| Args: |
| text: Extracted text from OCR |
| |
| Returns: |
| Dictionary mapping tickers to amounts: {ticker: amount} |
| Returns empty dict if no valid tickers found |
| """ |
| if not text: |
| return {} |
|
|
| |
| revolut_portfolio = parse_revolut_format(text) |
| if revolut_portfolio: |
| return revolut_portfolio |
|
|
| |
| portfolio = {} |
|
|
| |
| for pattern in TICKER_PATTERNS: |
| matches = re.findall(pattern, text, re.MULTILINE | re.IGNORECASE) |
|
|
| for match in matches: |
| try: |
| |
| |
| group1, group2 = match |
|
|
| |
| if re.match(r'^[\d,.]+$', group1): |
| amount_str = group1 |
| ticker = group2.upper() |
| else: |
| ticker = group1.upper() |
| amount_str = group2 |
|
|
| |
| if not re.match(r'^[A-Z]{1,10}$', ticker): |
| continue |
|
|
| |
| |
| clean_amount = re.sub(r'[\$€£,\s]', '', amount_str) |
| |
| clean_amount = clean_amount.replace(',', '.') |
|
|
| |
| amount = float(clean_amount) |
|
|
| |
| if amount > 1: |
| |
| if ticker not in portfolio or amount > portfolio[ticker]: |
| portfolio[ticker] = amount |
|
|
| except (ValueError, IndexError, AttributeError): |
| |
| continue |
|
|
| |
| |
| false_positive_patterns = [ |
| r'^ID$', r'^USD$', r'^EUR$', r'^GBP$', r'^JPY$', r'^CHF$', |
| r'^AM$', r'^PM$', |
| r'^JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC$', |
| r'^[A-Z]{6,}$', |
| ] |
|
|
| filtered_portfolio = {} |
| for ticker, amount in portfolio.items(): |
| is_false_positive = any(re.match(pattern, ticker) for pattern in false_positive_patterns) |
| if not is_false_positive: |
| filtered_portfolio[ticker] = amount |
|
|
| return filtered_portfolio |
|
|
|
|
| def validate_portfolio_json(json_str: str) -> Tuple[bool, Optional[Dict[str, float]], str]: |
| """ |
| Validate user-edited portfolio JSON. |
| |
| Expected format: {"AAPL": 5000, "GOOGL": 3000, ...} |
| |
| Args: |
| json_str: JSON string to validate |
| |
| Returns: |
| Tuple of (is_valid, parsed_dict, error_message) |
| - If valid: (True, portfolio_dict, "") |
| - If invalid: (False, None, error_message) |
| """ |
| if not json_str or not json_str.strip(): |
| return False, None, "JSON is empty" |
|
|
| try: |
| |
| data = json.loads(json_str) |
|
|
| |
| if not isinstance(data, dict): |
| return False, None, "JSON must be a dictionary/object, not a list or other type" |
|
|
| |
| portfolio = {} |
| for ticker, amount in data.items(): |
| |
| if not isinstance(ticker, str): |
| return False, None, f"Ticker '{ticker}' must be a string" |
|
|
| |
| if not ticker.isupper(): |
| return False, None, f"Ticker '{ticker}' should be uppercase (e.g., 'AAPL' not 'aapl')" |
|
|
| |
| if len(ticker) < 1 or len(ticker) > 10: |
| return False, None, f"Ticker '{ticker}' length should be between 1-10 characters" |
|
|
| |
| try: |
| amount_float = float(amount) |
| except (TypeError, ValueError): |
| return False, None, f"Amount for {ticker} must be a number, got: {amount}" |
|
|
| |
| if amount_float <= 0: |
| return False, None, f"Amount for {ticker} must be positive, got: {amount_float}" |
|
|
| portfolio[ticker] = amount_float |
|
|
| |
| if len(portfolio) == 0: |
| return False, None, "Portfolio must contain at least one ticker" |
|
|
| |
| if len(portfolio) > MAX_TICKERS: |
| return False, None, f"Portfolio exceeds maximum of {MAX_TICKERS} tickers" |
|
|
| return True, portfolio, "" |
|
|
| except json.JSONDecodeError as e: |
| return False, None, f"Invalid JSON format: {str(e)}" |
| except Exception as e: |
| return False, None, f"Validation error: {str(e)}" |
|
|
|
|
| def merge_portfolios(portfolios: list[Dict[str, float]]) -> Dict[str, float]: |
| """ |
| Merge multiple portfolio dictionaries. |
| |
| If the same ticker appears in multiple portfolios, amounts are summed. |
| |
| Args: |
| portfolios: List of portfolio dictionaries |
| |
| Returns: |
| Merged portfolio dictionary with summed amounts |
| """ |
| merged = {} |
|
|
| for portfolio in portfolios: |
| for ticker, amount in portfolio.items(): |
| if ticker in merged: |
| merged[ticker] += amount |
| else: |
| merged[ticker] = amount |
|
|
| return merged |
|
|
|
|
| def format_portfolio_json(portfolio: Dict[str, float], indent: int = 2) -> str: |
| """ |
| Format portfolio dictionary as pretty-printed JSON. |
| |
| Args: |
| portfolio: Dictionary of {ticker: amount} |
| indent: Number of spaces for indentation |
| |
| Returns: |
| Formatted JSON string |
| """ |
| return json.dumps(portfolio, indent=indent, sort_keys=True) |
|
|