Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Count tactic occurrences in response analysis JSON files. | |
| Reads all *_response_analysis.json files from final_response/ directory | |
| and counts how many times each tactic appears in the analysis. | |
| Usage: | |
| python count_tactics.py [--output OUTPUT_PATH] | |
| """ | |
| import argparse | |
| import json | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Dict, Any | |
| def find_project_root(start: Path) -> Path: | |
| """Find the project root by looking for common markers.""" | |
| for p in [start] + list(start.parents): | |
| if (p / 'final_response').exists() or (p / 'src').exists() or (p / '.git').exists(): | |
| return p | |
| return start.parent | |
| # Define the 8 allowed tactics that match Mordor dataset folder names | |
| ALLOWED_TACTICS = { | |
| "collection", "credential_access", "defense_evasion", "discovery", | |
| "execution", "lateral_movement", "persistance" | |
| } | |
| def detect_tactic_in_json(path: Path, target_tactic: str) -> int: | |
| """ | |
| Detect if a tactic exists in JSON file (binary detection). | |
| Now simplified since tactics are standardized as lists with only the 8 allowed values. | |
| Returns 1 if tactic found at least once, 0 if not found. | |
| """ | |
| def find_tactic_in_lists(obj): | |
| """Recursively search for tactic lists and check if target is present""" | |
| if isinstance(obj, dict): | |
| for k, v in obj.items(): | |
| if k == "tactic" and isinstance(v, list): | |
| # Check if target tactic is in the list | |
| if target_tactic in v: | |
| return True | |
| # Recurse into nested objects | |
| if find_tactic_in_lists(v): | |
| return True | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| if find_tactic_in_lists(item): | |
| return True | |
| return False | |
| try: | |
| data = json.loads(path.read_text(encoding="utf-8")) | |
| return 1 if find_tactic_in_lists(data) else 0 | |
| except Exception as e: | |
| print(f"[WARNING] Error reading {path}: {e}") | |
| return 0 | |
| def extract_total_events_analyzed(path: Path) -> int: | |
| """Extract total_events_analyzed from JSON file.""" | |
| try: | |
| data = json.loads(path.read_text(encoding="utf-8")) | |
| # Check various possible locations | |
| if isinstance(data, dict): | |
| # Top level | |
| if "total_events_analyzed" in data: | |
| return data["total_events_analyzed"] | |
| # correlation_analysis level | |
| if "correlation_analysis" in data and isinstance(data["correlation_analysis"], dict): | |
| if "total_events_analyzed" in data["correlation_analysis"]: | |
| return data["correlation_analysis"]["total_events_analyzed"] | |
| # metadata level | |
| if "metadata" in data and isinstance(data["metadata"], dict): | |
| if "total_events_analyzed" in data["metadata"]: | |
| return data["metadata"]["total_events_analyzed"] | |
| if "total_abnormal_events" in data["metadata"]: | |
| return data["metadata"]["total_abnormal_events"] | |
| return 0 | |
| except Exception: | |
| return 0 | |
| def find_response_analysis_files(base_path: Path) -> list: | |
| """Find all response analysis JSON files in model/tactic folder structure.""" | |
| results = [] | |
| # Iterate through model folders (first level) | |
| for model_folder in sorted(base_path.iterdir()): | |
| if not model_folder.is_dir(): | |
| continue | |
| model_name = model_folder.name | |
| # Iterate through tactic folders (second level) | |
| for tactic_folder in sorted(model_folder.iterdir()): | |
| if not tactic_folder.is_dir(): | |
| continue | |
| tactic_label = tactic_folder.name | |
| # Iterate through timestamped folders (third level) | |
| for timestamp_folder in sorted(tactic_folder.iterdir()): | |
| if not timestamp_folder.is_dir(): | |
| continue | |
| # Find response analysis JSON files | |
| json_files = list(timestamp_folder.glob('*_response_analysis.json')) | |
| for json_file in json_files: | |
| results.append({ | |
| 'json_path': json_file, | |
| 'tactic_label': tactic_label, | |
| 'model_name': model_name | |
| }) | |
| return results | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Count tactic occurrences in response analysis files" | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| default="full_pipeline_evaluation/results/tactic_counts_summary.json", | |
| help="Output file for summary results" | |
| ) | |
| args = parser.parse_args() | |
| # Find project root and final_response directory | |
| current_file = Path(__file__).resolve() | |
| project_root = find_project_root(current_file.parent) | |
| final_response_dir = project_root / "final_response" | |
| if not final_response_dir.exists(): | |
| print(f"[ERROR] final_response directory not found at: {final_response_dir}") | |
| print("Run execute_pipeline.py first to generate analysis results") | |
| return 1 | |
| print("="*80) | |
| print("COUNTING TACTIC OCCURRENCES") | |
| print("="*80) | |
| print(f"Scanning: {final_response_dir}") | |
| print(f"Allowed tactics: {', '.join(sorted(ALLOWED_TACTICS))}") | |
| print() | |
| # Find all response analysis files | |
| file_info_list = find_response_analysis_files(final_response_dir) | |
| if not file_info_list: | |
| print("[ERROR] No response analysis JSON files found") | |
| print("Expected structure: final_response/model_name/tactic_name/timestamp/*_response_analysis.json") | |
| return 1 | |
| print(f"Found {len(file_info_list)} response analysis files\n") | |
| # Process each file | |
| results = [] | |
| for file_info in file_info_list: | |
| json_path = file_info['json_path'] | |
| tactic_label = file_info['tactic_label'] | |
| model_name = file_info['model_name'] | |
| # Since tactics are now standardized, we can directly use the folder name | |
| # The folder name should match one of the 8 allowed tactics | |
| target_tactic = tactic_label | |
| # Validate that the tactic is in our allowed list | |
| if target_tactic not in ALLOWED_TACTICS: | |
| print(f"[WARNING] Unknown tactic '{target_tactic}' in folder name, skipping...") | |
| continue | |
| # Binary detection: 1 if detected, 0 if not | |
| tactic_detected = detect_tactic_in_json(json_path, target_tactic) | |
| total_events = extract_total_events_analyzed(json_path) | |
| results.append({ | |
| "file": str(json_path.relative_to(final_response_dir)), | |
| "model": model_name, | |
| "tactic": target_tactic, | |
| "tactic_detected": tactic_detected, | |
| "total_abnormal_events_detected": total_events | |
| }) | |
| status = "DETECTED" if tactic_detected == 1 else "NOT DETECTED" | |
| print(f" {model_name}/{tactic_label}/{json_path.parent.name}/{json_path.name}") | |
| print(f" Status: {status}, Events analyzed: {total_events}") | |
| # Create output summary | |
| output_path = Path(args.output) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| summary = { | |
| "timestamp": datetime.now().isoformat(), | |
| "total_files_processed": len(results), | |
| "results": results | |
| } | |
| output_path.write_text(json.dumps(summary, indent=2), encoding="utf-8") | |
| # Calculate summary statistics | |
| total_detected = sum(1 for r in results if r['tactic_detected'] == 1) | |
| total_files = len(results) | |
| detection_rate = (total_detected / total_files * 100) if total_files > 0 else 0 | |
| print("\n" + "="*80) | |
| print("TACTIC COUNTING COMPLETE") | |
| print("="*80) | |
| print(f"Processed: {total_files} files") | |
| print(f"Tactics detected: {total_detected}/{total_files} ({detection_rate:.1f}%)") | |
| print(f"Output: {output_path}") | |
| print("="*80 + "\n") | |
| return 0 | |
| if __name__ == "__main__": | |
| exit(main()) |