Kaanta / transaction_aggregator.py
Oluwaferanmi
This is the latest changes
66d6b11
# transaction_aggregator.py
"""
Transaction Aggregator for Tax Optimization
Aggregates classified transactions into tax calculation inputs
"""
from __future__ import annotations
from typing import Dict, List, Any, Optional
from datetime import datetime, date
from collections import defaultdict
class TransactionAggregator:
"""
Aggregates classified transactions into inputs for the TaxEngine
"""
def __init__(self):
pass
def aggregate_for_tax_year(
self,
classified_transactions: List[Dict[str, Any]],
tax_year: int
) -> Dict[str, float]:
"""
Aggregate transactions into tax calculation inputs
Args:
classified_transactions: List of transactions with tax_category field
tax_year: Year to aggregate for
Returns:
Dictionary compatible with TaxEngine.run() inputs parameter
"""
# Filter transactions for the tax year
year_transactions = self._filter_by_year(classified_transactions, tax_year)
# Initialize aggregation buckets
aggregated = {
# Income components
"gross_income": 0.0,
"basic": 0.0,
"housing": 0.0,
"transport": 0.0,
"bonus": 0.0,
"other_allowances": 0.0,
# Deductions
"employee_pension_contribution": 0.0,
"nhf": 0.0,
"life_insurance": 0.0,
"union_dues": 0.0,
# Additional (for 2026 rules)
"annual_rent_paid": 0.0,
# Business-related (for CIT)
"assessable_profits": 0.0,
"turnover_annual": 0.0,
# Required for minimum wage exemption rule
"employment_income_annual": 0.0,
"min_wage_monthly": 70000.0, # Current Nigerian minimum wage
}
# Aggregate by category
for tx in year_transactions:
category = tx.get("tax_category", "uncategorized")
amount = abs(float(tx.get("amount", 0)))
tx_type = tx.get("type", "").lower()
# Income categories (credits)
if tx_type == "credit":
if category == "employment_income":
aggregated["gross_income"] += amount
# Try to parse salary breakdown from metadata
metadata = tx.get("metadata", {})
if metadata:
aggregated["basic"] += metadata.get("basic_salary", 0)
aggregated["housing"] += metadata.get("housing_allowance", 0)
aggregated["transport"] += metadata.get("transport_allowance", 0)
aggregated["bonus"] += metadata.get("bonus", 0)
else:
# If no breakdown, assume it's all basic
aggregated["basic"] += amount
elif category == "business_income":
aggregated["turnover_annual"] += amount
# Simplified: assume 30% profit margin
aggregated["assessable_profits"] += amount * 0.30
elif category == "rental_income":
aggregated["gross_income"] += amount
aggregated["other_allowances"] += amount
# Deduction categories (debits)
elif tx_type == "debit":
if category == "pension_contribution":
aggregated["employee_pension_contribution"] += amount
elif category == "nhf_contribution":
aggregated["nhf"] += amount
elif category == "life_insurance":
aggregated["life_insurance"] += amount
elif category == "union_dues":
aggregated["union_dues"] += amount
elif category == "rent_paid":
aggregated["annual_rent_paid"] += amount
# Ensure gross_income includes all components
if aggregated["basic"] > 0:
aggregated["gross_income"] = (
aggregated["basic"] +
aggregated["housing"] +
aggregated["transport"] +
aggregated["bonus"] +
aggregated["other_allowances"]
)
# Set employment_income_annual (same as gross_income for employed individuals)
aggregated["employment_income_annual"] = aggregated["gross_income"]
return aggregated
def _filter_by_year(
self,
transactions: List[Dict[str, Any]],
year: int
) -> List[Dict[str, Any]]:
"""Filter transactions by tax year"""
filtered = []
for tx in transactions:
tx_date = tx.get("date")
# Handle different date formats
if isinstance(tx_date, str):
try:
tx_date = datetime.fromisoformat(tx_date.replace('Z', '+00:00'))
except:
try:
tx_date = datetime.strptime(tx_date, "%Y-%m-%d")
except:
continue
if isinstance(tx_date, datetime):
tx_date = tx_date.date()
if isinstance(tx_date, date) and tx_date.year == year:
filtered.append(tx)
return filtered
def identify_optimization_opportunities(
self,
aggregated: Dict[str, float],
tax_year: int = 2025
) -> List[Dict[str, Any]]:
"""
Identify missing or suboptimal deductions
Returns list of optimization opportunities
"""
opportunities = []
gross_income = aggregated.get("gross_income", 0)
if gross_income == 0:
return opportunities
# 1. Pension optimization
current_pension = aggregated.get("employee_pension_contribution", 0)
optimal_pension = gross_income * 0.20 # Max 20% is deductible
mandatory_pension = gross_income * 0.08 # Minimum 8% mandatory
if current_pension < optimal_pension:
potential_additional = optimal_pension - current_pension
# Estimate tax savings (using average rate of 21%)
estimated_savings = potential_additional * 0.21
opportunities.append({
"type": "increase_pension",
"category": "pension_contribution",
"current_annual": current_pension,
"optimal_annual": optimal_pension,
"additional_contribution": potential_additional,
"estimated_tax_savings": estimated_savings,
"priority": "high" if current_pension < mandatory_pension else "medium",
"description": f"Increase pension contributions by ₦{potential_additional:,.0f}/year",
"implementation": "Contact your PFA to set up Additional Voluntary Contribution (AVC)"
})
# 2. Life insurance
current_insurance = aggregated.get("life_insurance", 0)
if current_insurance == 0:
suggested_premium = min(100000, gross_income * 0.02) # 2% of income, max ₦100K
estimated_savings = suggested_premium * 0.21
opportunities.append({
"type": "add_life_insurance",
"category": "life_insurance",
"current_annual": 0,
"optimal_annual": suggested_premium,
"additional_contribution": suggested_premium,
"estimated_tax_savings": estimated_savings,
"priority": "medium",
"description": f"Purchase life insurance policy (₦{suggested_premium:,.0f}/year premium)",
"implementation": "Get quotes from licensed insurers. Keep premium receipts for tax filing."
})
# 3. NHF contribution
current_nhf = aggregated.get("nhf", 0)
basic_salary = aggregated.get("basic", gross_income * 0.6) # Estimate if not available
expected_nhf = basic_salary * 0.025 # 2.5% of basic
if current_nhf < expected_nhf * 0.5: # Less than half of expected
opportunities.append({
"type": "verify_nhf",
"category": "nhf_contribution",
"current_annual": current_nhf,
"optimal_annual": expected_nhf,
"additional_contribution": expected_nhf - current_nhf,
"estimated_tax_savings": (expected_nhf - current_nhf) * 0.21,
"priority": "low",
"description": "Verify NHF contributions are being deducted",
"implementation": "Check with employer that 2.5% of basic salary goes to NHF"
})
# 4. Rent relief (for 2026)
if tax_year >= 2026:
annual_rent = aggregated.get("annual_rent_paid", 0)
if annual_rent > 0:
max_relief = min(500000, annual_rent * 0.20)
estimated_savings = max_relief * 0.21
opportunities.append({
"type": "claim_rent_relief",
"category": "rent_paid",
"current_annual": annual_rent,
"optimal_annual": annual_rent,
"relief_amount": max_relief,
"estimated_tax_savings": estimated_savings,
"priority": "high",
"description": f"Claim rent relief of ₦{max_relief:,.0f} under NTA 2025",
"implementation": "Gather rent receipts and landlord documentation for tax filing"
})
# Sort by priority and estimated savings
priority_order = {"high": 0, "medium": 1, "low": 2}
opportunities.sort(
key=lambda x: (priority_order.get(x["priority"], 3), -x["estimated_tax_savings"])
)
return opportunities
def get_income_breakdown(
self,
classified_transactions: List[Dict[str, Any]],
tax_year: int
) -> Dict[str, Any]:
"""
Get detailed breakdown of income sources
"""
year_transactions = self._filter_by_year(classified_transactions, tax_year)
income_by_source = defaultdict(float)
income_by_month = defaultdict(float)
for tx in year_transactions:
if tx.get("type", "").lower() == "credit":
category = tx.get("tax_category", "uncategorized")
amount = abs(float(tx.get("amount", 0)))
income_by_source[category] += amount
# Monthly breakdown
tx_date = tx.get("date")
if isinstance(tx_date, str):
try:
tx_date = datetime.fromisoformat(tx_date.replace('Z', '+00:00'))
except:
tx_date = datetime.strptime(tx_date, "%Y-%m-%d")
if isinstance(tx_date, (datetime, date)):
month_key = f"{tax_year}-{tx_date.month:02d}"
income_by_month[month_key] += amount
total_income = sum(income_by_source.values())
return {
"total_annual_income": total_income,
"income_by_source": dict(income_by_source),
"income_by_month": dict(sorted(income_by_month.items())),
"average_monthly_income": total_income / 12 if total_income > 0 else 0
}
def get_deduction_breakdown(
self,
classified_transactions: List[Dict[str, Any]],
tax_year: int
) -> Dict[str, Any]:
"""
Get detailed breakdown of deductions
"""
year_transactions = self._filter_by_year(classified_transactions, tax_year)
deductions_by_type = defaultdict(float)
for tx in year_transactions:
if tx.get("type", "").lower() == "debit" and tx.get("deductible", False):
category = tx.get("tax_category", "uncategorized")
amount = abs(float(tx.get("amount", 0)))
deductions_by_type[category] += amount
total_deductions = sum(deductions_by_type.values())
return {
"total_annual_deductions": total_deductions,
"deductions_by_type": dict(deductions_by_type),
"deduction_count": len([t for t in year_transactions if t.get("deductible", False)])
}