| | import logging |
| | import asyncio |
| | from typing import Dict, Any, Optional |
| | import tempfile |
| | import os |
| | from pathlib import Path |
| | import uuid |
| |
|
| | from core.document_parser import DocumentParser |
| | from core.chunker import TextChunker |
| | from core.text_preprocessor import TextPreprocessor |
| | from services.vector_store_service import VectorStoreService |
| | from services.document_store_service import DocumentStoreService |
| | from services.embedding_service import EmbeddingService |
| | from services.ocr_service import OCRService |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class IngestionTool: |
| | def __init__(self, vector_store: VectorStoreService, document_store: DocumentStoreService, |
| | embedding_service: EmbeddingService, ocr_service: OCRService): |
| | self.vector_store = vector_store |
| | self.document_store = document_store |
| | self.embedding_service = embedding_service |
| | self.ocr_service = ocr_service |
| | |
| | self.document_parser = DocumentParser() |
| | |
| | self.document_parser.ocr_service = ocr_service |
| | |
| | self.text_chunker = TextChunker() |
| | self.text_preprocessor = TextPreprocessor() |
| | |
| | async def process_document(self, file_path: str, file_type: str, task_id: Optional[str] = None) -> Dict[str, Any]: |
| | """Process a document through the full ingestion pipeline""" |
| | if task_id is None: |
| | task_id = str(uuid.uuid4()) |
| | |
| | try: |
| | logger.info(f"Starting document processing for {file_path}") |
| | |
| | |
| | filename = Path(file_path).name |
| | document = await self.document_parser.parse_document(file_path, filename) |
| | |
| | if not document.content: |
| | logger.warning(f"No content extracted from document {filename}") |
| | return { |
| | "success": False, |
| | "error": "No content could be extracted from the document", |
| | "task_id": task_id |
| | } |
| | |
| | |
| | await self.document_store.store_document(document) |
| | |
| | |
| | chunks = await self._create_and_embed_chunks(document) |
| | |
| | if not chunks: |
| | logger.warning(f"No chunks created for document {document.id}") |
| | return { |
| | "success": False, |
| | "error": "Failed to create text chunks", |
| | "task_id": task_id, |
| | "document_id": document.id, |
| | "filename": document.filename, |
| | "chunks_created": len(chunks), |
| | "content_length": len(document.content), |
| | "doc_type": document.doc_type.value, |
| | "message": f"Successfully processed {filename}" |
| | } |
| | |
| | |
| | success = await self.vector_store.add_chunks(chunks) |
| | |
| | if not success: |
| | logger.error(f"Failed to store embeddings for document {document.id}") |
| | return { |
| | "success": False, |
| | "error": "Failed to store embeddings", |
| | "task_id": task_id, |
| | "document_id": document.id |
| | } |
| | |
| | |
| | try: |
| | current_metadata = document.metadata or {} |
| | current_metadata["chunk_count"] = len(chunks) |
| | await self.document_store.update_document_metadata( |
| | document.id, |
| | {"metadata": current_metadata} |
| | ) |
| | except Exception as e: |
| | logger.warning(f"Failed to update chunk count for document {document.id}: {e}") |
| | |
| | logger.info(f"Successfully processed document {document.id} with {len(chunks)} chunks") |
| | |
| | return { |
| | "success": True, |
| | "task_id": task_id, |
| | "document_id": document.id, |
| | "filename": document.filename, |
| | "chunks_created": len(chunks), |
| | "content_length": len(document.content), |
| | "doc_type": document.doc_type.value, |
| | "message": f"Successfully processed {filename}" |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error processing document {file_path}: {str(e)}") |
| | return { |
| | "success": False, |
| | "error": str(e), |
| | "task_id": task_id, |
| | "message": f"Failed to process document: {str(e)}" |
| | } |
| | |
| | async def _create_and_embed_chunks(self, document) -> list: |
| | """Create chunks and generate embeddings""" |
| | try: |
| | |
| | chunks = self.text_chunker.chunk_document( |
| | document.id, |
| | document.content, |
| | method="recursive" |
| | ) |
| | |
| | if not chunks: |
| | return [] |
| | |
| | |
| | optimized_chunks = self.text_chunker.optimize_chunks_for_embedding(chunks) |
| | |
| | |
| | texts = [chunk.content for chunk in optimized_chunks] |
| | embeddings = await self.embedding_service.generate_embeddings(texts) |
| | |
| | |
| | embedded_chunks = [] |
| | for i, chunk in enumerate(optimized_chunks): |
| | if i < len(embeddings): |
| | chunk.embedding = embeddings[i] |
| | embedded_chunks.append(chunk) |
| | |
| | return embedded_chunks |
| | |
| | except Exception as e: |
| | logger.error(f"Error creating and embedding chunks: {str(e)}") |
| | return [] |
| | |
| | async def process_url(self, url: str, task_id: Optional[str] = None) -> Dict[str, Any]: |
| | """Process a document from a URL""" |
| | try: |
| | import requests |
| | from urllib.parse import urlparse |
| | |
| | |
| | response = requests.get(url, timeout=30) |
| | response.raise_for_status() |
| | |
| | |
| | parsed_url = urlparse(url) |
| | filename = Path(parsed_url.path).name or "downloaded_file" |
| | |
| | |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file: |
| | tmp_file.write(response.content) |
| | tmp_file_path = tmp_file.name |
| | |
| | try: |
| | |
| | result = await self.process_document(tmp_file_path, "", task_id) |
| | result["source_url"] = url |
| | return result |
| | finally: |
| | |
| | if os.path.exists(tmp_file_path): |
| | os.unlink(tmp_file_path) |
| | |
| | except Exception as e: |
| | logger.error(f"Error processing URL {url}: {str(e)}") |
| | return { |
| | "success": False, |
| | "error": str(e), |
| | "task_id": task_id or str(uuid.uuid4()), |
| | "source_url": url |
| | } |
| | |
| | async def process_text_content(self, content: str, filename: str = "text_content.txt", |
| | task_id: Optional[str] = None) -> Dict[str, Any]: |
| | """Process raw text content directly""" |
| | try: |
| | from core.models import Document, DocumentType |
| | from datetime import datetime |
| | |
| | |
| | document = Document( |
| | id=str(uuid.uuid4()), |
| | filename=filename, |
| | content=content, |
| | doc_type=DocumentType.TEXT, |
| | file_size=len(content.encode('utf-8')), |
| | created_at=datetime.utcnow(), |
| | metadata={ |
| | "source": "direct_text_input", |
| | "content_length": len(content), |
| | "word_count": len(content.split()) |
| | } |
| | ) |
| | |
| | |
| | await self.document_store.store_document(document) |
| | |
| | |
| | chunks = await self._create_and_embed_chunks(document) |
| | |
| | if chunks: |
| | await self.vector_store.add_chunks(chunks) |
| | |
| | |
| | try: |
| | current_metadata = document.metadata or {} |
| | current_metadata["chunk_count"] = len(chunks) |
| | await self.document_store.update_document_metadata( |
| | document.id, |
| | {"metadata": current_metadata} |
| | ) |
| | except Exception as e: |
| | logger.warning(f"Failed to update chunk count for document {document.id}: {e}") |
| | |
| | return { |
| | "success": True, |
| | "task_id": task_id or str(uuid.uuid4()), |
| | "document_id": document.id, |
| | "filename": filename, |
| | "chunks_created": len(chunks), |
| | "content_length": len(content), |
| | "message": f"Successfully processed text content" |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error processing text content: {str(e)}") |
| | return { |
| | "success": False, |
| | "error": str(e), |
| | "task_id": task_id or str(uuid.uuid4()) |
| | } |
| | |
| | async def reprocess_document(self, document_id: str, task_id: Optional[str] = None) -> Dict[str, Any]: |
| | """Reprocess an existing document (useful for updating embeddings)""" |
| | try: |
| | |
| | document = await self.document_store.get_document(document_id) |
| | |
| | if not document: |
| | return { |
| | "success": False, |
| | "error": f"Document {document_id} not found", |
| | "task_id": task_id or str(uuid.uuid4()) |
| | } |
| | |
| | |
| | await self.vector_store.delete_document(document_id) |
| | |
| | |
| | chunks = await self._create_and_embed_chunks(document) |
| | |
| | if chunks: |
| | await self.vector_store.add_chunks(chunks) |
| | |
| | |
| | try: |
| | current_metadata = document.metadata or {} |
| | current_metadata["chunk_count"] = len(chunks) |
| | await self.document_store.update_document_metadata( |
| | document.id, |
| | {"metadata": current_metadata} |
| | ) |
| | except Exception as e: |
| | logger.warning(f"Failed to update chunk count for document {document.id}: {e}") |
| | |
| | return { |
| | "success": True, |
| | "task_id": task_id or str(uuid.uuid4()), |
| | "document_id": document_id, |
| | "filename": document.filename, |
| | "chunks_created": len(chunks), |
| | "message": f"Successfully reprocessed {document.filename}" |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error reprocessing document {document_id}: {str(e)}") |
| | return { |
| | "success": False, |
| | "error": str(e), |
| | "task_id": task_id or str(uuid.uuid4()), |
| | "document_id": document_id |
| | } |
| | |
| | async def batch_process_directory(self, directory_path: str, task_id: Optional[str] = None) -> Dict[str, Any]: |
| | """Process multiple documents from a directory""" |
| | try: |
| | directory = Path(directory_path) |
| | if not directory.exists() or not directory.is_dir(): |
| | return { |
| | "success": False, |
| | "error": f"Directory {directory_path} does not exist", |
| | "task_id": task_id or str(uuid.uuid4()) |
| | } |
| | |
| | |
| | supported_extensions = {'.txt', '.pdf', '.docx', '.png', '.jpg', '.jpeg', '.bmp', '.tiff'} |
| | |
| | |
| | files_to_process = [] |
| | for ext in supported_extensions: |
| | files_to_process.extend(directory.glob(f"*{ext}")) |
| | files_to_process.extend(directory.glob(f"*{ext.upper()}")) |
| | |
| | if not files_to_process: |
| | return { |
| | "success": False, |
| | "error": "No supported files found in directory", |
| | "task_id": task_id or str(uuid.uuid4()) |
| | } |
| | |
| | |
| | results = [] |
| | successful = 0 |
| | failed = 0 |
| | |
| | for file_path in files_to_process: |
| | try: |
| | result = await self.process_document(str(file_path), file_path.suffix) |
| | results.append(result) |
| | |
| | if result.get("success"): |
| | successful += 1 |
| | else: |
| | failed += 1 |
| | |
| | except Exception as e: |
| | failed += 1 |
| | results.append({ |
| | "success": False, |
| | "error": str(e), |
| | "filename": file_path.name |
| | }) |
| | |
| | return { |
| | "success": True, |
| | "task_id": task_id or str(uuid.uuid4()), |
| | "directory": str(directory), |
| | "total_files": len(files_to_process), |
| | "successful": successful, |
| | "failed": failed, |
| | "results": results, |
| | "message": f"Processed {successful}/{len(files_to_process)} files successfully" |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error batch processing directory {directory_path}: {str(e)}") |
| | return { |
| | "success": False, |
| | "error": str(e), |
| | "task_id": task_id or str(uuid.uuid4()) |
| | } |