snikhilesh commited on
Commit
59bbd50
·
verified ·
1 Parent(s): 303942f

Deploy preprocessing_pipeline.py to backend/ directory

Browse files
Files changed (1) hide show
  1. backend/preprocessing_pipeline.py +514 -0
backend/preprocessing_pipeline.py ADDED
@@ -0,0 +1,514 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Medical Preprocessing Pipeline - Phase 2
3
+ Central orchestration layer for medical file processing and extraction.
4
+
5
+ This module coordinates all preprocessing components including file detection,
6
+ PHI de-identification, and modality-specific extraction to produce structured data
7
+ for AI model processing.
8
+
9
+ Author: MiniMax Agent
10
+ Date: 2025-10-29
11
+ Version: 1.0.0
12
+ """
13
+
14
+ import os
15
+ import json
16
+ import logging
17
+ import time
18
+ from typing import Dict, List, Optional, Any, Tuple
19
+ from dataclasses import dataclass, asdict
20
+ from pathlib import Path
21
+ import traceback
22
+
23
+ from file_detector import MedicalFileDetector, FileDetectionResult, MedicalFileType
24
+ from phi_deidentifier import MedicalPHIDeidentifier, DeidentificationResult, PHICategory
25
+ from pdf_extractor import MedicalPDFProcessor, ExtractionResult
26
+ from dicom_processor import DICOMProcessor, DICOMProcessingResult
27
+ from ecg_processor import ECGSignalProcessor, ECGProcessingResult
28
+ from medical_schemas import (
29
+ ValidationResult, validate_document_schema, route_to_specialized_model,
30
+ MedicalDocumentMetadata, ConfidenceScore
31
+ )
32
+
33
+ logger = logging.getLogger(__name__)
34
+
35
+
36
+ @dataclass
37
+ class ProcessingPipelineResult:
38
+ """Result of complete preprocessing pipeline"""
39
+ document_id: str
40
+ file_detection: FileDetectionResult
41
+ deidentification_result: Optional[DeidentificationResult]
42
+ extraction_result: Any # Can be ExtractionResult, DICOMProcessingResult, or ECGProcessingResult
43
+ structured_data: Dict[str, Any]
44
+ validation_result: ValidationResult
45
+ model_routing: Dict[str, Any]
46
+ processing_time: float
47
+ pipeline_metadata: Dict[str, Any]
48
+
49
+
50
+ class MedicalPreprocessingPipeline:
51
+ """Main preprocessing pipeline for medical documents"""
52
+
53
+ def __init__(self, config: Optional[Dict[str, Any]] = None):
54
+ self.config = config or self._default_config()
55
+
56
+ # Initialize components
57
+ self.file_detector = MedicalFileDetector()
58
+ self.phi_deidentifier = MedicalPHIDeidentifier(self.config.get('phi_config', {}))
59
+ self.pdf_processor = MedicalPDFProcessor()
60
+ self.dicom_processor = DICOMProcessor()
61
+ self.ecg_processor = ECGSignalProcessor()
62
+
63
+ # Pipeline statistics
64
+ self.stats = {
65
+ "total_processed": 0,
66
+ "successful_processing": 0,
67
+ "phi_deidentified": 0,
68
+ "validation_passed": 0,
69
+ "processing_times": [],
70
+ "error_counts": {}
71
+ }
72
+
73
+ logger.info("Medical Preprocessing Pipeline initialized")
74
+
75
+ def _default_config(self) -> Dict[str, Any]:
76
+ """Default pipeline configuration"""
77
+ return {
78
+ "enable_phi_deidentification": True,
79
+ "enable_validation": True,
80
+ "enable_model_routing": True,
81
+ "max_file_size_mb": 100,
82
+ "supported_formats": [".pdf", ".dcm", ".dicom", ".xml", ".scp", ".csv"],
83
+ "phi_config": {
84
+ "compliance_level": "HIPAA",
85
+ "use_hashing": True,
86
+ "redaction_method": "placeholder"
87
+ },
88
+ "validation_strict_mode": False,
89
+ "output_format": "schema_compliant"
90
+ }
91
+
92
+ def process_document(self, file_path: str, document_type: str = "auto") -> ProcessingPipelineResult:
93
+ """
94
+ Process a single medical document through the complete pipeline
95
+
96
+ Args:
97
+ file_path: Path to medical document
98
+ document_type: Document type hint ("auto", "radiology", "laboratory", etc.)
99
+
100
+ Returns:
101
+ ProcessingPipelineResult with complete processing results
102
+ """
103
+ start_time = time.time()
104
+ document_id = self._generate_document_id(file_path)
105
+
106
+ try:
107
+ logger.info(f"Starting processing pipeline for document: {file_path}")
108
+
109
+ # Step 1: File Detection and Analysis
110
+ file_detection = self._detect_and_analyze_file(file_path)
111
+
112
+ # Step 2: PHI De-identification (if enabled and needed)
113
+ deidentification_result = None
114
+ if self.config["enable_phi_deidentification"]:
115
+ deidentification_result = self._perform_phi_deidentification(file_path, file_detection)
116
+
117
+ # Step 3: Extract Structured Data
118
+ extraction_result = self._extract_structured_data(file_path, file_detection, document_type)
119
+
120
+ # Step 4: Validate Against Schema
121
+ validation_result = self._validate_extracted_data(extraction_result)
122
+
123
+ # Step 5: Model Routing
124
+ model_routing = self._determine_model_routing(extraction_result, validation_result)
125
+
126
+ # Step 6: Compile Final Results
127
+ processing_time = time.time() - start_time
128
+
129
+ pipeline_metadata = {
130
+ "pipeline_version": "1.0.0",
131
+ "processing_timestamp": time.time(),
132
+ "file_size": os.path.getsize(file_path) if os.path.exists(file_path) else 0,
133
+ "config_used": self.config
134
+ }
135
+
136
+ result = ProcessingPipelineResult(
137
+ document_id=document_id,
138
+ file_detection=file_detection,
139
+ deidentification_result=deidentification_result,
140
+ extraction_result=extraction_result,
141
+ structured_data=self._compile_structured_data(extraction_result, deidentification_result),
142
+ validation_result=validation_result,
143
+ model_routing=model_routing,
144
+ processing_time=processing_time,
145
+ pipeline_metadata=pipeline_metadata
146
+ )
147
+
148
+ # Update statistics
149
+ self._update_statistics(result, True)
150
+
151
+ logger.info(f"Pipeline processing completed successfully in {processing_time:.2f}s")
152
+ return result
153
+
154
+ except Exception as e:
155
+ logger.error(f"Pipeline processing failed: {str(e)}")
156
+
157
+ # Create error result
158
+ error_result = ProcessingPipelineResult(
159
+ document_id=document_id,
160
+ file_detection=FileDetectionResult(
161
+ file_type=MedicalFileType.UNKNOWN,
162
+ confidence=0.0,
163
+ detected_features=["processing_error"],
164
+ mime_type="application/octet-stream",
165
+ file_size=0,
166
+ metadata={"error": str(e)},
167
+ recommended_extractor="error_handler"
168
+ ),
169
+ deidentification_result=None,
170
+ extraction_result=None,
171
+ structured_data={"error": str(e), "traceback": traceback.format_exc()},
172
+ validation_result=ValidationResult(is_valid=False, validation_errors=[str(e)]),
173
+ model_routing={"error": str(e)},
174
+ processing_time=time.time() - start_time,
175
+ pipeline_metadata={"error": str(e), "processing_timestamp": time.time()}
176
+ )
177
+
178
+ # Update statistics
179
+ self._update_statistics(error_result, False)
180
+
181
+ return error_result
182
+
183
+ def _detect_and_analyze_file(self, file_path: str) -> FileDetectionResult:
184
+ """Detect file type and characteristics"""
185
+ try:
186
+ result = self.file_detector.detect_file_type(file_path)
187
+ logger.info(f"File detected: {result.file_type.value} (confidence: {result.confidence:.2f})")
188
+ return result
189
+ except Exception as e:
190
+ logger.error(f"File detection error: {str(e)}")
191
+ raise
192
+
193
+ def _perform_phi_deidentification(self, file_path: str,
194
+ file_detection: FileDetectionResult) -> Optional[DeidentificationResult]:
195
+ """Perform PHI de-identification if needed"""
196
+ try:
197
+ # Determine document type for PHI processing
198
+ doc_type_mapping = {
199
+ MedicalFileType.PDF_CLINICAL: "clinical_notes",
200
+ MedicalFileType.PDF_RADIOLOGY: "radiology",
201
+ MedicalFileType.PDF_LABORATORY: "laboratory",
202
+ MedicalFileType.PDF_ECG_REPORT: "ecg",
203
+ MedicalFileType.DICOM_CT: "radiology",
204
+ MedicalFileType.DICOM_MRI: "radiology",
205
+ MedicalFileType.DICOM_XRAY: "radiology",
206
+ MedicalFileType.DICOM_ULTRASOUND: "radiology",
207
+ MedicalFileType.ECG_XML: "ecg",
208
+ MedicalFileType.ECG_SCPE: "ecg",
209
+ MedicalFileType.ECG_CSV: "ecg"
210
+ }
211
+
212
+ doc_type = doc_type_mapping.get(file_detection.file_type, "general")
213
+
214
+ # Read file content for PHI detection
215
+ with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
216
+ content = f.read()
217
+
218
+ if content:
219
+ result = self.phi_deidentifier.deidentify_text(content, doc_type)
220
+ logger.info(f"PHI de-identification completed: {len(result.phi_matches)} PHI entities found")
221
+ return result
222
+ else:
223
+ logger.warning("No text content found for PHI de-identification")
224
+ return None
225
+
226
+ except Exception as e:
227
+ logger.error(f"PHI de-identification error: {str(e)}")
228
+ return None
229
+
230
+ def _extract_structured_data(self, file_path: str, file_detection: FileDetectionResult,
231
+ document_type: str) -> Any:
232
+ """Extract structured data based on file type"""
233
+ try:
234
+ # Route to appropriate extractor based on file type
235
+ if file_detection.file_type in [MedicalFileType.PDF_CLINICAL, MedicalFileType.PDF_RADIOLOGY,
236
+ MedicalFileType.PDF_LABORATORY, MedicalFileType.PDF_ECG_REPORT]:
237
+ # PDF processing
238
+ doc_type = "unknown"
239
+ if file_detection.file_type == MedicalFileType.PDF_RADIOLOGY:
240
+ doc_type = "radiology"
241
+ elif file_detection.file_type == MedicalFileType.PDF_LABORATORY:
242
+ doc_type = "laboratory"
243
+ elif file_detection.file_type == MedicalFileType.PDF_ECG_REPORT:
244
+ doc_type = "ecg_report"
245
+ elif file_detection.file_type == MedicalFileType.PDF_CLINICAL:
246
+ doc_type = "clinical_notes"
247
+
248
+ result = self.pdf_processor.process_pdf(file_path, doc_type)
249
+ logger.info(f"PDF processing completed: {result.extraction_method}")
250
+ return result
251
+
252
+ elif file_detection.file_type in [MedicalFileType.DICOM_CT, MedicalFileType.DICOM_MRI,
253
+ MedicalFileType.DICOM_XRAY, MedicalFileType.DICOM_ULTRASOUND]:
254
+ # DICOM processing
255
+ result = self.dicom_processor.process_dicom_file(file_path)
256
+ logger.info(f"DICOM processing completed: {result.modality}")
257
+ return result
258
+
259
+ elif file_detection.file_type in [MedicalFileType.ECG_XML, MedicalFileType.ECG_SCPE,
260
+ MedicalFileType.ECG_CSV]:
261
+ # ECG processing
262
+ format_mapping = {
263
+ MedicalFileType.ECG_XML: "xml",
264
+ MedicalFileType.ECG_SCPE: "scp",
265
+ MedicalFileType.ECG_CSV: "csv"
266
+ }
267
+ ecg_format = format_mapping.get(file_detection.file_type, "auto")
268
+
269
+ result = self.ecg_processor.process_ecg_file(file_path, ecg_format)
270
+ logger.info(f"ECG processing completed: {len(result.lead_names)} leads")
271
+ return result
272
+
273
+ else:
274
+ raise ValueError(f"No appropriate extractor for file type: {file_detection.file_type}")
275
+
276
+ except Exception as e:
277
+ logger.error(f"Data extraction error: {str(e)}")
278
+ raise
279
+
280
+ def _validate_extracted_data(self, extraction_result: Any) -> ValidationResult:
281
+ """Validate extracted data against medical schemas"""
282
+ if not self.config["enable_validation"]:
283
+ return ValidationResult(is_valid=True, compliance_score=1.0)
284
+
285
+ try:
286
+ # Convert extraction result to dictionary format
287
+ if hasattr(extraction_result, 'structured_data'):
288
+ # PDF extraction result
289
+ structured_data = extraction_result.structured_data
290
+ elif hasattr(extraction_result, 'metadata') and hasattr(extraction_result, 'confidence_score'):
291
+ # DICOM or ECG processing result
292
+ structured_data = asdict(extraction_result)
293
+ else:
294
+ structured_data = {"raw_data": extraction_result}
295
+
296
+ # Determine document type from extraction result
297
+ doc_type = "unknown"
298
+ if "document_type" in structured_data:
299
+ doc_type = structured_data["document_type"]
300
+ elif "modality" in structured_data:
301
+ doc_type = "radiology"
302
+ elif "signal_data" in structured_data:
303
+ doc_type = "ECG"
304
+
305
+ # Add metadata for validation
306
+ if "metadata" not in structured_data:
307
+ structured_data["metadata"] = {
308
+ "source_type": doc_type,
309
+ "extraction_timestamp": time.time()
310
+ }
311
+
312
+ # Validate against schema
313
+ validation_result = validate_document_schema(structured_data)
314
+
315
+ if validation_result.is_valid:
316
+ logger.info(f"Schema validation passed: {doc_type}")
317
+ else:
318
+ logger.warning(f"Schema validation failed: {validation_result.validation_errors}")
319
+
320
+ return validation_result
321
+
322
+ except Exception as e:
323
+ logger.error(f"Validation error: {str(e)}")
324
+ return ValidationResult(
325
+ is_valid=False,
326
+ validation_errors=[str(e)],
327
+ compliance_score=0.0
328
+ )
329
+
330
+ def _determine_model_routing(self, extraction_result: Any,
331
+ validation_result: ValidationResult) -> Dict[str, Any]:
332
+ """Determine appropriate AI model routing"""
333
+ if not self.config["enable_model_routing"]:
334
+ return {"routing_disabled": True}
335
+
336
+ try:
337
+ # Extract document data for routing decision
338
+ if hasattr(extraction_result, 'structured_data'):
339
+ structured_data = extraction_result.structured_data
340
+ else:
341
+ structured_data = asdict(extraction_result)
342
+
343
+ # Use schema routing function
344
+ recommended_model = route_to_specialized_model(structured_data)
345
+
346
+ routing_info = {
347
+ "recommended_model": recommended_model,
348
+ "validation_passed": validation_result.is_valid,
349
+ "confidence_threshold_met": validation_result.compliance_score > 0.6,
350
+ "requires_human_review": validation_result.compliance_score < 0.85,
351
+ "routing_confidence": validation_result.compliance_score
352
+ }
353
+
354
+ logger.info(f"Model routing: {recommended_model} (confidence: {validation_result.compliance_score:.2f})")
355
+ return routing_info
356
+
357
+ except Exception as e:
358
+ logger.error(f"Model routing error: {str(e)}")
359
+ return {"error": str(e), "fallback_model": "generic_processor"}
360
+
361
+ def _compile_structured_data(self, extraction_result: Any,
362
+ deidentification_result: Optional[DeidentificationResult]) -> Dict[str, Any]:
363
+ """Compile final structured data output"""
364
+ try:
365
+ # Start with extraction result
366
+ if hasattr(extraction_result, 'structured_data'):
367
+ structured_data = extraction_result.structured_data.copy()
368
+ else:
369
+ structured_data = asdict(extraction_result)
370
+
371
+ # Add de-identification information
372
+ if deidentification_result:
373
+ structured_data["phi_deidentification"] = {
374
+ "phi_entities_removed": len(deidentification_result.phi_matches),
375
+ "deidentification_method": deidentification_result.anonymization_method,
376
+ "original_hash": deidentification_result.hash_original,
377
+ "compliance_level": deidentification_result.compliance_level
378
+ }
379
+
380
+ # Add extraction metadata
381
+ if hasattr(extraction_result, 'metadata'):
382
+ structured_data["extraction_metadata"] = extraction_result.metadata
383
+
384
+ # Add confidence scores
385
+ if hasattr(extraction_result, 'confidence_scores'):
386
+ structured_data["extraction_confidence"] = extraction_result.confidence_scores
387
+
388
+ return structured_data
389
+
390
+ except Exception as e:
391
+ logger.error(f"Data compilation error: {str(e)}")
392
+ return {"error": str(e)}
393
+
394
+ def _generate_document_id(self, file_path: str) -> str:
395
+ """Generate unique document ID"""
396
+ import hashlib
397
+ file_stat = os.stat(file_path)
398
+ identifier = f"{file_path}_{file_stat.st_size}_{file_stat.st_mtime}"
399
+ return hashlib.md5(identifier.encode()).hexdigest()[:12]
400
+
401
+ def _update_statistics(self, result: ProcessingPipelineResult, success: bool):
402
+ """Update pipeline statistics"""
403
+ self.stats["total_processed"] += 1
404
+
405
+ if success:
406
+ self.stats["successful_processing"] += 1
407
+
408
+ if result.deidentification_result:
409
+ self.stats["phi_deidentified"] += 1
410
+
411
+ if result.validation_result.is_valid:
412
+ self.stats["validation_passed"] += 1
413
+
414
+ self.stats["processing_times"].append(result.processing_time)
415
+
416
+ # Track errors
417
+ if not success:
418
+ error_type = type(result.structured_data.get("error", Exception())).__name__
419
+ self.stats["error_counts"][error_type] = self.stats["error_counts"].get(error_type, 0) + 1
420
+
421
+ def get_pipeline_statistics(self) -> Dict[str, Any]:
422
+ """Get comprehensive pipeline statistics"""
423
+ processing_times = self.stats["processing_times"]
424
+
425
+ return {
426
+ "total_documents_processed": self.stats["total_processed"],
427
+ "successful_processing_rate": self.stats["successful_processing"] / max(self.stats["total_processed"], 1),
428
+ "phi_deidentification_rate": self.stats["phi_deidentified"] / max(self.stats["total_processed"], 1),
429
+ "validation_pass_rate": self.stats["validation_passed"] / max(self.stats["total_processed"], 1),
430
+ "average_processing_time": sum(processing_times) / len(processing_times) if processing_times else 0,
431
+ "error_breakdown": self.stats["error_counts"],
432
+ "pipeline_health": "healthy" if self.stats["successful_processing"] > self.stats["total_processed"] * 0.9 else "degraded"
433
+ }
434
+
435
+ def batch_process(self, file_paths: List[str], document_types: Optional[List[str]] = None) -> List[ProcessingPipelineResult]:
436
+ """Process multiple documents in batch"""
437
+ if document_types is None:
438
+ document_types = ["auto"] * len(file_paths)
439
+
440
+ results = []
441
+
442
+ for i, (file_path, doc_type) in enumerate(zip(file_paths, document_types)):
443
+ logger.info(f"Processing batch document {i+1}/{len(file_paths)}: {file_path}")
444
+
445
+ try:
446
+ result = self.process_document(file_path, doc_type)
447
+ results.append(result)
448
+ except Exception as e:
449
+ logger.error(f"Batch processing error for {file_path}: {str(e)}")
450
+ # Create error result
451
+ error_result = ProcessingPipelineResult(
452
+ document_id=self._generate_document_id(file_path),
453
+ file_detection=FileDetectionResult(
454
+ file_type=MedicalFileType.UNKNOWN,
455
+ confidence=0.0,
456
+ detected_features=["batch_error"],
457
+ mime_type="application/octet-stream",
458
+ file_size=0,
459
+ metadata={"error": str(e)},
460
+ recommended_extractor="error_handler"
461
+ ),
462
+ deidentification_result=None,
463
+ extraction_result=None,
464
+ structured_data={"error": str(e), "batch_processing_failed": True},
465
+ validation_result=ValidationResult(is_valid=False, validation_errors=[str(e)]),
466
+ model_routing={"error": str(e)},
467
+ processing_time=0.0,
468
+ pipeline_metadata={"batch_position": i, "error": str(e)}
469
+ )
470
+ results.append(error_result)
471
+
472
+ logger.info(f"Batch processing completed: {len(results)} documents processed")
473
+ return results
474
+
475
+ def export_pipeline_result(self, result: ProcessingPipelineResult, output_path: str):
476
+ """Export pipeline result to JSON file"""
477
+ try:
478
+ export_data = {
479
+ "document_id": result.document_id,
480
+ "file_detection": asdict(result.file_detection),
481
+ "deidentification_result": asdict(result.deidentification_result) if result.deidentification_result else None,
482
+ "extraction_result": self._serialize_extraction_result(result.extraction_result),
483
+ "structured_data": result.structured_data,
484
+ "validation_result": asdict(result.validation_result),
485
+ "model_routing": result.model_routing,
486
+ "processing_time": result.processing_time,
487
+ "pipeline_metadata": result.pipeline_metadata,
488
+ "export_timestamp": time.time()
489
+ }
490
+
491
+ with open(output_path, 'w') as f:
492
+ json.dump(export_data, f, indent=2, default=str)
493
+
494
+ logger.info(f"Pipeline result exported to: {output_path}")
495
+
496
+ except Exception as e:
497
+ logger.error(f"Export error: {str(e)}")
498
+
499
+ def _serialize_extraction_result(self, extraction_result: Any) -> Dict[str, Any]:
500
+ """Serialize extraction result for JSON export"""
501
+ try:
502
+ if hasattr(extraction_result, '__dict__'):
503
+ return asdict(extraction_result)
504
+ else:
505
+ return {"data": extraction_result}
506
+ except:
507
+ return {"error": "Could not serialize extraction result"}
508
+
509
+
510
+ # Export main classes
511
+ __all__ = [
512
+ "MedicalPreprocessingPipeline",
513
+ "ProcessingPipelineResult"
514
+ ]