| import json
|
| import logging
|
| from datetime import datetime
|
| from typing import Dict, List
|
| import pandas as pd
|
| from bs4 import BeautifulSoup
|
| import requests
|
| from urllib.parse import urlparse
|
|
|
| from config import RAW_DIR, PROCESSED_DIR, LOG_DIR
|
|
|
| class DataProcessor:
|
| def __init__(self):
|
|
|
| log_file = LOG_DIR / f"processor_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
|
| logging.basicConfig(
|
| level=logging.INFO,
|
| format='%(asctime)s - %(levelname)s - %(message)s',
|
| filename=log_file
|
| )
|
|
|
| self.processed_data = {}
|
|
|
| def _extract_domain(self, url: str) -> str:
|
| """Extract domain from URL"""
|
| try:
|
| return urlparse(url).netloc
|
| except Exception:
|
| return ""
|
|
|
| def _scrape_webpage(self, url: str) -> str:
|
| """Scrape additional content from webpage"""
|
| try:
|
| headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
|
| response = requests.get(url, headers=headers, timeout=10)
|
| soup = BeautifulSoup(response.text, 'lxml')
|
|
|
|
|
| for element in soup(['script', 'style', 'nav', 'footer']):
|
| element.decompose()
|
|
|
| return ' '.join(soup.stripped_strings)
|
| except Exception as e:
|
| logging.error(f"Error scraping {url}: {e}")
|
| return ""
|
|
|
| def process_category(self, category: str) -> List[Dict]:
|
| """Process data for a single category"""
|
| input_file = RAW_DIR / f"{category}_results.json"
|
|
|
| try:
|
| with open(input_file, 'r') as f:
|
| raw_results = json.load(f)
|
| except Exception as e:
|
| logging.error(f"Error loading {input_file}: {e}")
|
| return []
|
|
|
| processed_results = []
|
|
|
| for result in raw_results:
|
| processed_result = {
|
| 'title': result.get('title', ''),
|
| 'snippet': result.get('snippet', ''),
|
| 'url': result.get('link', ''),
|
| 'domain': self._extract_domain(result.get('link', '')),
|
| 'category': category
|
| }
|
|
|
|
|
| if any(domain in processed_result['domain']
|
| for domain in ['visitbloomington.com', 'indiana.edu', 'bloomington.in.gov']):
|
| additional_content = self._scrape_webpage(processed_result['url'])
|
| processed_result['additional_content'] = additional_content[:5000]
|
|
|
| processed_results.append(processed_result)
|
|
|
|
|
| output_file = PROCESSED_DIR / f"{category}_processed.json"
|
| with open(output_file, 'w') as f:
|
| json.dump(processed_results, f, indent=2)
|
|
|
|
|
| df = pd.DataFrame(processed_results)
|
| df.to_csv(PROCESSED_DIR / f"{category}_processed.csv", index=False)
|
|
|
| self.processed_data[category] = processed_results
|
| return processed_results
|
|
|
| def process_all_categories(self) -> Dict[str, List[Dict]]:
|
| """Process all categories"""
|
| categories = [f.stem.replace('_results', '')
|
| for f in RAW_DIR.glob('*_results.json')]
|
|
|
| for category in categories:
|
| logging.info(f"Processing category: {category}")
|
| self.process_category(category)
|
|
|
|
|
| all_results = []
|
| for category_results in self.processed_data.values():
|
| all_results.extend(category_results)
|
|
|
| combined_df = pd.DataFrame(all_results)
|
| combined_df.to_csv(PROCESSED_DIR / "all_processed.csv", index=False)
|
|
|
|
|
| stats = {
|
| 'total_results': len(all_results),
|
| 'results_per_category': {
|
| category: len(results)
|
| for category, results in self.processed_data.items()
|
| },
|
| 'domains_distribution': combined_df['domain'].value_counts().to_dict(),
|
| 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
| }
|
|
|
| with open(PROCESSED_DIR / "processing_stats.json", 'w') as f:
|
| json.dump(stats, f, indent=2)
|
|
|
| return self.processed_data |