cuga-agent / src /cuga /evaluation /evaluate_cuga.py
Sami Marreed
feat: docker-v1 with optimized frontend
0646b18
from cuga.backend.activity_tracker.tracker import ActivityTracker
from cuga.backend.cuga_graph.utils.controller import AgentRunner, ExperimentResult
from cuga.evaluation.langfuse.get_langfuse_data import LangfuseTraceHandler
from loguru import logger
import traceback
from pydantic import BaseModel
from typing import List, Dict, Iterable, Any, Optional
import json
import csv
from calculate_test_score import evaluate_test_and_details, TestScore, TestScoreDetails, ToolCall
from statistics import mean
from pathlib import Path
import os
tracker = ActivityTracker()
class ExpectedOutput(BaseModel):
"""
The expected output a test case
"""
response: str
keywords: List[str]
tool_calls: List[ToolCall]
class TestCase(BaseModel):
"""
This is the model for your test cases, i.e. the input you give the evaluation loop
"""
app: str
name: str
description: str
intent: str
expected_output: ExpectedOutput
class TestResult(BaseModel):
"""
The evaluation loop output of a run on a single test case
"""
app: str
index: int
test_name: str
score: TestScore
details: TestScoreDetails
def dict_subset_with_reason(sup: Dict, sub: Dict, path="") -> List[str]:
"""Return list of reasons why sub is not a subset of sup."""
reasons = []
for k, v in sub.items():
if k not in sup:
reasons.append(f"Missing key '{path + k}'")
else:
sv = sup[k]
if isinstance(v, dict) and isinstance(sv, dict):
reasons.extend(dict_subset_with_reason(sv, v, path + k + "."))
elif sv != v:
reasons.append(f"Value mismatch at '{path + k}': expected {v}, got {sv}")
return reasons
def compare_toolcalls(a_list: Iterable[ToolCall], b_list: Iterable[ToolCall]) -> List[str]:
all_reasons = []
for a in a_list:
matched = False
for b in b_list:
if b.name in a.name:
reasons = dict_subset_with_reason(a.args, b.args)
if not reasons: # perfect match
matched = True
break
if not matched:
if not any(b.name in a.name for b in b_list):
all_reasons.append(f"No ToolCall in B has name substring matching '{a.name}'")
else:
all_reasons.append(f"Args mismatch for ToolCall '{a.name}'")
for b in b_list:
if b.name in a.name:
mismatch = dict_subset_with_reason(a.args, b.args)
if mismatch:
all_reasons.extend([f" vs B({b.name}): {r}" for r in mismatch])
return all_reasons
def parse_test_cases(json_file_path: str) -> dict[Any, list[Any]]:
"""Parse JSON test cases into TestCase objects."""
# Resolve path: use absolute paths as-is, resolve relative paths from user's terminal location
path = Path(json_file_path)
if not path.is_absolute():
path = Path.cwd() / path
with open(path, 'r') as f:
data = json.load(f)
test_cases = {}
for app in data:
for test_case_data in app['test_cases']:
# Extract user input as intent (first user input)
intent = test_case_data['intent'] if test_case_data['intent'] else ""
# Parse tool calls
tool_calls = [
ToolCall(name=call['name'], args=call['args'])
for call in test_case_data['expected_output']['tool_calls']
]
# Parse expected output
expected_output = ExpectedOutput(
response=test_case_data['expected_output']['response'],
keywords=test_case_data['expected_output']['keywords'],
tool_calls=tool_calls,
)
# Create TestCase object
test_case = TestCase(
app=app['name'],
name=test_case_data['name'],
description=test_case_data['description'],
intent=intent,
expected_output=expected_output,
)
if app['name'] not in test_cases:
test_cases[app['name']] = []
test_cases[app['name']].append(test_case)
return test_cases
async def run_cuga(test_file_path: str, result_file_path: str) -> (List[TestCase], List[ExperimentResult]):
test_cases = parse_test_cases(test_file_path)
print(f"test cases: {len(test_cases)}\napps: {list(test_cases.keys())}")
agent_runner = AgentRunner(browser_enabled=False)
results = []
for app in test_cases:
task_ids = [f"{app}_{str(i)}" for i in enumerate(test_cases[app])]
tracker.start_experiment(task_ids=task_ids, experiment_name=app, description="")
for i, task in enumerate(test_cases[app]):
try:
tracker.reset(intent=task.intent, task_id=f"{app}_{str(i)}")
result = await agent_runner.run_task_generic(
eval_mode=False, goal=task.intent, current_datetime=tracker.current_date
)
# Reset variables after task completion using the current state
state = agent_runner.get_current_state()
state.variables_manager.reset()
results.append(result)
parsed_results = parse_test_results([task], [result])
save_test_results(parsed_results, result_file_path)
# Extract langfuse trace ID (applicable only if `langfuse_tracing=true` in settings)
langfuse_trace_id = agent_runner.agent_loop_obj.get_langfuse_trace_id()
langfuse_handler = LangfuseTraceHandler(langfuse_trace_id)
langfuse_data = await langfuse_handler.get_langfuse_data()
tracker.finish_task(
intent=task.intent,
site="",
task_id=f"{app}_{str(i)}",
eval="",
score=mean(
[
parsed_results[0].score.keyword_score,
parsed_results[0].score.response_score,
parsed_results[0].score.tool_call_score,
]
),
agent_answer=result.answer,
exception=False,
agent_v="",
total_llm_calls=langfuse_data.total_llm_calls if langfuse_data else None,
total_tokens=langfuse_data.total_tokens if langfuse_data else None,
total_cost=langfuse_data.total_cost if langfuse_data else None,
total_cache_input_tokens=langfuse_data.total_cache_input_tokens
if langfuse_data
else None,
)
except Exception as e:
results.append(ExperimentResult(answer=f"Error {e}", score=0, messages=[], steps=[]))
tracker.finish_task(
intent=task.intent,
site="",
task_id=f"{app}_{str(i)}",
eval="",
score=0,
agent_answer=f"Error: {e}",
exception=True,
agent_v="",
)
logger.error(traceback.format_exc())
logger.error(e)
return test_cases, results
def parse_test_results(
test_cases: List[TestCase], experiment_results: List[ExperimentResult]
) -> List[TestResult]:
if len(test_cases) != len(experiment_results):
raise ValueError(f"Mismatch: {len(test_cases)} test cases vs {len(experiment_results)} results")
results = []
for i, (test_case, experiment_result) in enumerate(zip(test_cases, experiment_results)):
# Get answer text (handle None case)
answer = experiment_result.answer or ""
keywords = test_case.expected_output.keywords
expected_tools = [tool for tool in test_case.expected_output.tool_calls]
tool_calls = []
for call in [step for step in experiment_result.steps if "api_call" in step.name]:
call_json = json.loads(call.data)
tool_calls.append(ToolCall(name=call_json['function_name'], args=call_json['args']))
test_score, test_score_details = evaluate_test_and_details(
keywords, tool_calls, expected_tools, answer, test_case.expected_output.response
)
result = TestResult(
app=test_case.app,
index=i,
test_name=test_case.name,
score=test_score,
details=test_score_details,
)
results.append(result)
return results
def save_test_results(
results: List["TestResult"],
json_path: str = "test_results.json",
csv_path: Optional[str] = None,
) -> None:
"""
Save test results to JSON (as a list) and CSV (append rows, no duplicate headers).
"""
if csv_path is None:
csv_path = json_path[:-5] + ".csv" if json_path.endswith(".json") else json_path + ".csv"
# ---- JSON ----
# Load existing results (list), append, then overwrite
if os.path.exists(json_path) and os.path.getsize(json_path) > 0:
with open(json_path, "r", encoding="utf-8") as f:
try:
existing = json.load(f)
if not isinstance(existing, list):
existing = []
except json.JSONDecodeError:
existing = []
else:
existing = []
existing.extend(r.model_dump() for r in results)
with open(json_path, "w", encoding="utf-8") as f:
json.dump(existing, f, indent=2, ensure_ascii=False)
# ---- CSV ----
def j(obj):
return json.dumps(obj, ensure_ascii=False, separators=(",", ":"))
rows = []
for r in results:
rows.append(
{
"app": r.app,
"index": r.index,
"test_name": r.test_name,
"keyword_score": r.score.keyword_score,
"tool_call_score": r.score.tool_call_score,
"response_score": r.score.response_score,
"expected_keywords": j(r.details.expected_keywords),
"missing_keywords": j(r.details.missing_keywords),
"tool_call_mismatches": j([m.model_dump() for m in r.details.tool_call_mismatches]),
"response_expected": r.details.response_expected,
"response_actual": r.details.response_actual,
}
)
write_header = not os.path.exists(csv_path) or os.path.getsize(csv_path) == 0
with open(csv_path, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
if write_header:
writer.writeheader()
writer.writerows(rows)
print(f"Saved {len(results)} results → JSON: {json_path} | CSV: {csv_path}")
if __name__ == "__main__":
import asyncio
import argparse
from cuga.config import settings
settings.update({"ADVANCED_FEATURES": {"TRACKER_ENABLED": True}}, merge=True)
parser = argparse.ArgumentParser(description="Run tests and save results.")
parser.add_argument("-t", "--test-file-path", required=True, help="Path to the test file")
parser.add_argument("-r", "--result-file-path", required=True, help="Path to the result file")
args = parser.parse_args()
tasks, results = asyncio.run(run_cuga(args.test_file_path, args.result_file_path))