File size: 8,603 Bytes
1313d86
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
"""
CSV / Excel parser for bank statement uploads.
Supports flexible column name mapping.
"""
import pandas as pd
import io
from datetime import datetime
from typing import Optional
from loguru import logger


COLUMN_ALIASES = {
    "date": ["date", "txn date", "transaction date", "value date", "trans date",
             "booking date", "posting date", "transaction_date", "txn_date", "date_time"],
    "description": ["description", "narration", "particulars", "remarks", "details",
                    "transaction details", "trans details", "merchant", "raw_description",
                    "recipient_name", "recipient", "note", "transaction_remarks"],
    "amount": ["amount", "transaction amount", "debit/credit amount", "debit amount",
               "credit amount", "txn amount"],
    "debit": ["debit", "withdrawal", "dr", "debit amount"],
    "credit": ["credit", "deposit", "cr", "credit amount"],
    "balance": ["balance", "closing balance", "available balance", "running balance"],
    "method": ["method", "mode", "payment method", "type", "transaction type",
               "payment mode", "channel"],
}


def _find_column(df_cols: list, aliases: list) -> Optional[str]:
    """Find the first matching column from aliases (case-insensitive)."""
    lower_cols = {c.lower().strip(): c for c in df_cols}
    for alias in aliases:
        if alias.lower() in lower_cols:
            return lower_cols[alias.lower()]
    return None


def _detect_direction(row, debit_col: Optional[str], credit_col: Optional[str],
                      amount_col: Optional[str], desc: str) -> tuple:
    """Determine if a transaction is debit or credit and return amount."""
    amount = 0.0
    direction = "debit"

    if debit_col and credit_col:
        # Separate debit/credit columns
        debit_val = row.get(debit_col, None)
        credit_val = row.get(credit_col, None)
        try:
            debit_amt = float(str(debit_val).replace(',', '').strip()) if pd.notna(debit_val) and str(debit_val).strip() not in ('', '-', 'nan') else 0
        except (ValueError, TypeError):
            debit_amt = 0
        try:
            credit_amt = float(str(credit_val).replace(',', '').strip()) if pd.notna(credit_val) and str(credit_val).strip() not in ('', '-', 'nan') else 0
        except (ValueError, TypeError):
            credit_amt = 0

        if debit_amt > 0:
            amount = debit_amt
            direction = "debit"
        elif credit_amt > 0:
            amount = credit_amt
            direction = "credit"
    elif amount_col:
        raw = str(row.get(amount_col, 0)).replace(',', '').strip()
        try:
            amount = abs(float(raw))
        except (ValueError, TypeError):
            amount = 0
        # Infer direction from sign or description keywords
        try:
            if float(raw) < 0:
                direction = "debit"
            else:
                direction = "credit"
        except (ValueError, TypeError):
            desc_lower = desc.lower()
            direction = "credit" if any(w in desc_lower for w in ['salary', 'credit', 'received', 'refund']) else "debit"

    return amount, direction


def _detect_payment_method(desc: str, method_val: Optional[str]) -> str:
    if method_val and str(method_val).strip().upper() not in ('NAN', ''):
        mv = str(method_val).strip().upper()
        for m in ['UPI', 'IMPS', 'NEFT', 'ATM']:
            if m in mv:
                return m
    desc_upper = desc.upper()
    for m in ['UPI', 'IMPS', 'NEFT', 'ATM']:
        if m in desc_upper:
            return m
    return 'OTHER'


def parse_bank_statement(file_bytes: bytes, filename: str) -> list:
    """
    Parse a CSV or Excel bank statement and return a list of normalized transaction dicts.
    """
    try:
        if filename.lower().endswith(('.xlsx', '.xls')):
            df = pd.read_excel(io.BytesIO(file_bytes), engine='openpyxl')
        else:
            # Try multiple encodings
            for enc in ['utf-8', 'latin-1', 'cp1252']:
                try:
                    df = pd.read_csv(io.BytesIO(file_bytes), encoding=enc, skip_blank_lines=True)
                    break
                except UnicodeDecodeError:
                    continue
            else:
                raise ValueError("Could not decode CSV file")
    except Exception as e:
        raise ValueError(f"Failed to read file: {e}")

    # Strip whitespace from column names
    df.columns = [str(c).strip() for c in df.columns]

    # Drop rows that are completely empty
    df = df.dropna(how='all')

    cols = df.columns.tolist()
    date_col = _find_column(cols, COLUMN_ALIASES["date"])
    desc_col = _find_column(cols, COLUMN_ALIASES["description"])
    amount_col = _find_column(cols, COLUMN_ALIASES["amount"])
    debit_col = _find_column(cols, COLUMN_ALIASES["debit"])
    credit_col = _find_column(cols, COLUMN_ALIASES["credit"])
    balance_col = _find_column(cols, COLUMN_ALIASES["balance"])
    method_col = _find_column(cols, COLUMN_ALIASES["method"])

    if not date_col or not desc_col:
        raise ValueError(
            f"Could not auto-detect required columns (date, description). "
            f"Found: {cols}. "
            f"Please ensure your CSV has columns like: date, description, amount."
        )

    if not amount_col and not (debit_col and credit_col):
        raise ValueError(
            "Could not find amount column(s). "
            "Need either 'amount' or separate 'debit'/'credit' columns."
        )

    transactions = []
    skipped = 0
    for _, row in df.iterrows():
        try:
            # Parse date
            raw_date = row[date_col]
            if pd.isna(raw_date):
                skipped += 1
                continue
            if isinstance(raw_date, datetime):
                txn_date = raw_date
            else:
                raw_date_str = str(raw_date).strip()
                for fmt in [
                    '%d-%m-%Y', '%d/%m/%Y', '%Y-%m-%d', '%m/%d/%Y',
                    '%d %b %Y', '%d %B %Y', '%b %d, %Y', '%d-%b-%Y',
                    '%d/%m/%y', '%m/%d/%y',
                ]:
                    try:
                        txn_date = datetime.strptime(raw_date_str, fmt)
                        break
                    except ValueError:
                        continue
                else:
                    try:
                        txn_date = pd.to_datetime(raw_date_str, dayfirst=True).to_pydatetime()
                    except Exception:
                        skipped += 1
                        continue

            # Build a rich description if multiple columns exist
            all_desc_parts = []
            main_desc = str(row[desc_col]).strip()
            if main_desc and main_desc.lower() != 'nan':
                all_desc_parts.append(main_desc)
            
            # Check for extra info in common columns like UPI_ID or Note
            for extra_col_alias in ["upi_id", "note", "remarks", "category_hint"]:
                found_col = _find_column(cols, [extra_col_alias])
                if found_col and found_col != desc_col:
                    val = str(row[found_col]).strip()
                    if val and val.lower() != 'nan' and val not in all_desc_parts:
                        all_desc_parts.append(val)
            
            desc = " | ".join(all_desc_parts)
            if not desc:
                skipped += 1
                continue

            amount, direction = _detect_direction(row, debit_col, credit_col, amount_col, desc)
            if amount <= 0:
                skipped += 1
                continue

            # Balance
            balance = None
            if balance_col:
                try:
                    bal_raw = str(row[balance_col]).replace(',', '').strip()
                    balance = float(bal_raw) if bal_raw not in ('nan', '', '-') else None
                except (ValueError, TypeError):
                    balance = None

            method = _detect_payment_method(desc, row.get(method_col) if method_col else None)

            transactions.append({
                "raw_description": desc,
                "amount": round(amount, 2),
                "direction": direction,
                "payment_method": method,
                "transaction_date": txn_date,
                "balance": balance,
            })
        except Exception as e:
            logger.warning(f"Skipped row: {e}")
            skipped += 1

    logger.info(f"Parsed {len(transactions)} transactions, skipped {skipped} rows from '{filename}'")
    return transactions