# transaction_aggregator.py """ Transaction Aggregator for Tax Optimization Aggregates classified transactions into tax calculation inputs """ from __future__ import annotations from typing import Dict, List, Any, Optional from datetime import datetime, date from collections import defaultdict class TransactionAggregator: """ Aggregates classified transactions into inputs for the TaxEngine """ def __init__(self): pass def aggregate_for_tax_year( self, classified_transactions: List[Dict[str, Any]], tax_year: int ) -> Dict[str, float]: """ Aggregate transactions into tax calculation inputs Args: classified_transactions: List of transactions with tax_category field tax_year: Year to aggregate for Returns: Dictionary compatible with TaxEngine.run() inputs parameter """ # Filter transactions for the tax year year_transactions = self._filter_by_year(classified_transactions, tax_year) # Initialize aggregation buckets aggregated = { # Income components "gross_income": 0.0, "basic": 0.0, "housing": 0.0, "transport": 0.0, "bonus": 0.0, "other_allowances": 0.0, # Deductions "employee_pension_contribution": 0.0, "nhf": 0.0, "life_insurance": 0.0, "union_dues": 0.0, # Additional (for 2026 rules) "annual_rent_paid": 0.0, # Business-related (for CIT) "assessable_profits": 0.0, "turnover_annual": 0.0, # Required for minimum wage exemption rule "employment_income_annual": 0.0, "min_wage_monthly": 70000.0, # Current Nigerian minimum wage } # Aggregate by category for tx in year_transactions: category = tx.get("tax_category", "uncategorized") amount = abs(float(tx.get("amount", 0))) tx_type = tx.get("type", "").lower() # Income categories (credits) if tx_type == "credit": if category == "employment_income": aggregated["gross_income"] += amount # Try to parse salary breakdown from metadata metadata = tx.get("metadata", {}) if metadata: aggregated["basic"] += metadata.get("basic_salary", 0) aggregated["housing"] += metadata.get("housing_allowance", 0) aggregated["transport"] += metadata.get("transport_allowance", 0) aggregated["bonus"] += metadata.get("bonus", 0) else: # If no breakdown, assume it's all basic aggregated["basic"] += amount elif category == "business_income": aggregated["turnover_annual"] += amount # Simplified: assume 30% profit margin aggregated["assessable_profits"] += amount * 0.30 elif category == "rental_income": aggregated["gross_income"] += amount aggregated["other_allowances"] += amount # Deduction categories (debits) elif tx_type == "debit": if category == "pension_contribution": aggregated["employee_pension_contribution"] += amount elif category == "nhf_contribution": aggregated["nhf"] += amount elif category == "life_insurance": aggregated["life_insurance"] += amount elif category == "union_dues": aggregated["union_dues"] += amount elif category == "rent_paid": aggregated["annual_rent_paid"] += amount # Ensure gross_income includes all components if aggregated["basic"] > 0: aggregated["gross_income"] = ( aggregated["basic"] + aggregated["housing"] + aggregated["transport"] + aggregated["bonus"] + aggregated["other_allowances"] ) # Set employment_income_annual (same as gross_income for employed individuals) aggregated["employment_income_annual"] = aggregated["gross_income"] return aggregated def _filter_by_year( self, transactions: List[Dict[str, Any]], year: int ) -> List[Dict[str, Any]]: """Filter transactions by tax year""" filtered = [] for tx in transactions: tx_date = tx.get("date") # Handle different date formats if isinstance(tx_date, str): try: tx_date = datetime.fromisoformat(tx_date.replace('Z', '+00:00')) except: try: tx_date = datetime.strptime(tx_date, "%Y-%m-%d") except: continue if isinstance(tx_date, datetime): tx_date = tx_date.date() if isinstance(tx_date, date) and tx_date.year == year: filtered.append(tx) return filtered def identify_optimization_opportunities( self, aggregated: Dict[str, float], tax_year: int = 2025 ) -> List[Dict[str, Any]]: """ Identify missing or suboptimal deductions Returns list of optimization opportunities """ opportunities = [] gross_income = aggregated.get("gross_income", 0) if gross_income == 0: return opportunities # 1. Pension optimization current_pension = aggregated.get("employee_pension_contribution", 0) optimal_pension = gross_income * 0.20 # Max 20% is deductible mandatory_pension = gross_income * 0.08 # Minimum 8% mandatory if current_pension < optimal_pension: potential_additional = optimal_pension - current_pension # Estimate tax savings (using average rate of 21%) estimated_savings = potential_additional * 0.21 opportunities.append({ "type": "increase_pension", "category": "pension_contribution", "current_annual": current_pension, "optimal_annual": optimal_pension, "additional_contribution": potential_additional, "estimated_tax_savings": estimated_savings, "priority": "high" if current_pension < mandatory_pension else "medium", "description": f"Increase pension contributions by ₦{potential_additional:,.0f}/year", "implementation": "Contact your PFA to set up Additional Voluntary Contribution (AVC)" }) # 2. Life insurance current_insurance = aggregated.get("life_insurance", 0) if current_insurance == 0: suggested_premium = min(100000, gross_income * 0.02) # 2% of income, max ₦100K estimated_savings = suggested_premium * 0.21 opportunities.append({ "type": "add_life_insurance", "category": "life_insurance", "current_annual": 0, "optimal_annual": suggested_premium, "additional_contribution": suggested_premium, "estimated_tax_savings": estimated_savings, "priority": "medium", "description": f"Purchase life insurance policy (₦{suggested_premium:,.0f}/year premium)", "implementation": "Get quotes from licensed insurers. Keep premium receipts for tax filing." }) # 3. NHF contribution current_nhf = aggregated.get("nhf", 0) basic_salary = aggregated.get("basic", gross_income * 0.6) # Estimate if not available expected_nhf = basic_salary * 0.025 # 2.5% of basic if current_nhf < expected_nhf * 0.5: # Less than half of expected opportunities.append({ "type": "verify_nhf", "category": "nhf_contribution", "current_annual": current_nhf, "optimal_annual": expected_nhf, "additional_contribution": expected_nhf - current_nhf, "estimated_tax_savings": (expected_nhf - current_nhf) * 0.21, "priority": "low", "description": "Verify NHF contributions are being deducted", "implementation": "Check with employer that 2.5% of basic salary goes to NHF" }) # 4. Rent relief (for 2026) if tax_year >= 2026: annual_rent = aggregated.get("annual_rent_paid", 0) if annual_rent > 0: max_relief = min(500000, annual_rent * 0.20) estimated_savings = max_relief * 0.21 opportunities.append({ "type": "claim_rent_relief", "category": "rent_paid", "current_annual": annual_rent, "optimal_annual": annual_rent, "relief_amount": max_relief, "estimated_tax_savings": estimated_savings, "priority": "high", "description": f"Claim rent relief of ₦{max_relief:,.0f} under NTA 2025", "implementation": "Gather rent receipts and landlord documentation for tax filing" }) # Sort by priority and estimated savings priority_order = {"high": 0, "medium": 1, "low": 2} opportunities.sort( key=lambda x: (priority_order.get(x["priority"], 3), -x["estimated_tax_savings"]) ) return opportunities def get_income_breakdown( self, classified_transactions: List[Dict[str, Any]], tax_year: int ) -> Dict[str, Any]: """ Get detailed breakdown of income sources """ year_transactions = self._filter_by_year(classified_transactions, tax_year) income_by_source = defaultdict(float) income_by_month = defaultdict(float) for tx in year_transactions: if tx.get("type", "").lower() == "credit": category = tx.get("tax_category", "uncategorized") amount = abs(float(tx.get("amount", 0))) income_by_source[category] += amount # Monthly breakdown tx_date = tx.get("date") if isinstance(tx_date, str): try: tx_date = datetime.fromisoformat(tx_date.replace('Z', '+00:00')) except: tx_date = datetime.strptime(tx_date, "%Y-%m-%d") if isinstance(tx_date, (datetime, date)): month_key = f"{tax_year}-{tx_date.month:02d}" income_by_month[month_key] += amount total_income = sum(income_by_source.values()) return { "total_annual_income": total_income, "income_by_source": dict(income_by_source), "income_by_month": dict(sorted(income_by_month.items())), "average_monthly_income": total_income / 12 if total_income > 0 else 0 } def get_deduction_breakdown( self, classified_transactions: List[Dict[str, Any]], tax_year: int ) -> Dict[str, Any]: """ Get detailed breakdown of deductions """ year_transactions = self._filter_by_year(classified_transactions, tax_year) deductions_by_type = defaultdict(float) for tx in year_transactions: if tx.get("type", "").lower() == "debit" and tx.get("deductible", False): category = tx.get("tax_category", "uncategorized") amount = abs(float(tx.get("amount", 0))) deductions_by_type[category] += amount total_deductions = sum(deductions_by_type.values()) return { "total_annual_deductions": total_deductions, "deductions_by_type": dict(deductions_by_type), "deduction_count": len([t for t in year_transactions if t.get("deductible", False)]) }