|
|
import gradio as gr |
|
|
import os |
|
|
import random |
|
|
import time |
|
|
from datetime import datetime |
|
|
from functools import partial |
|
|
import json |
|
|
import io |
|
|
from huggingface_hub import HfApi |
|
|
from huggingface_hub.hf_api import HfHubHTTPError |
|
|
import traceback |
|
|
from itertools import combinations, product |
|
|
|
|
|
|
|
|
|
|
|
REPEAT_SINGLE_TARGET_FOR_TESTING = False |
|
|
NUM_REPEATED_TRIALS_FOR_TESTING = 5 |
|
|
|
|
|
BASE_IMAGE_DIR = "/data/images/images" |
|
|
TARGET_DIR_BASENAME = "gt" |
|
|
TARGET_DIR = os.path.join(BASE_IMAGE_DIR, TARGET_DIR_BASENAME) |
|
|
|
|
|
METHOD_ROOTS = [] |
|
|
if os.path.exists(BASE_IMAGE_DIR): |
|
|
try: |
|
|
METHOD_ROOTS = [ |
|
|
os.path.join(BASE_IMAGE_DIR, d) |
|
|
for d in os.listdir(BASE_IMAGE_DIR) |
|
|
if os.path.isdir(os.path.join(BASE_IMAGE_DIR, d)) and \ |
|
|
d != TARGET_DIR_BASENAME and \ |
|
|
not d.startswith('.') |
|
|
] |
|
|
if not METHOD_ROOTS: print(f"警告:在 '{BASE_IMAGE_DIR}' 中没有找到有效的方法目录 (除了 '{TARGET_DIR_BASENAME}')。") |
|
|
else: print(f"已识别的方法根目录 (原始): {METHOD_ROOTS}") |
|
|
except Exception as e: print(f"错误:在扫描 '{BASE_IMAGE_DIR}' 时发生错误: {e}"); METHOD_ROOTS = [] |
|
|
else: print(f"警告:基础目录 '{BASE_IMAGE_DIR}' 不存在。将无法加载候选图片。") |
|
|
|
|
|
SUBJECTS = ["subj01", "subj02", "subj05", "subj07"] |
|
|
SENTINEL_TRIAL_INTERVAL = 20 |
|
|
NUM_TRIALS_PER_RUN = 100 |
|
|
LOG_BATCH_SIZE = 20 |
|
|
|
|
|
DATASET_REPO_ID = "YanmHa/image-aligned-experiment-data" |
|
|
BATCH_LOG_FOLDER = "run_logs_batch" |
|
|
CSS = ".gr-block {margin-top: 4px !important; margin-bottom: 4px !important;} .compact_button { padding: 4px 8px; min-width: auto; }" |
|
|
|
|
|
|
|
|
if REPEAT_SINGLE_TARGET_FOR_TESTING: |
|
|
print(f"--- 特殊测试模式 (重复单一目标图) 已激活 ---") |
|
|
NUM_TRIALS_PER_RUN = NUM_REPEATED_TRIALS_FOR_TESTING |
|
|
print(f"测试模式:NUM_TRIALS_PER_RUN 已被设置为: {NUM_TRIALS_PER_RUN}") |
|
|
|
|
|
if METHOD_ROOTS: |
|
|
original_method_roots_count = len(METHOD_ROOTS) |
|
|
METHOD_ROOTS = [METHOD_ROOTS[0]] |
|
|
print(f"测试模式:METHOD_ROOTS 已从 {original_method_roots_count} 个缩减为仅包含第一个方法: {METHOD_ROOTS}") |
|
|
else: |
|
|
print("测试模式警告:METHOD_ROOTS 为空,无法缩减。") |
|
|
|
|
|
if len(METHOD_ROOTS) == 1: |
|
|
if len(SUBJECTS) >= 2: |
|
|
original_subjects_count = len(SUBJECTS) |
|
|
SUBJECTS = [SUBJECTS[0], SUBJECTS[1]] |
|
|
print(f"测试模式:由于方法仅1种,SUBJECTS 已从 {original_subjects_count} 个缩减为前两个: {SUBJECTS}") |
|
|
elif SUBJECTS: |
|
|
print(f"测试模式:SUBJECTS 只有一个元素 ({SUBJECTS}),方法也只有一种。注意:可能无法形成候选对。") |
|
|
else: |
|
|
print("测试模式警告:SUBJECTS 为空。") |
|
|
print(f"--- 特殊测试模式配置结束 ---") |
|
|
else: |
|
|
print(f"正常模式:使用完整配置。每轮目标试验数: {NUM_TRIALS_PER_RUN}") |
|
|
print(f"方法根目录: {METHOD_ROOTS}") |
|
|
print(f"Subjects: {SUBJECTS}") |
|
|
|
|
|
|
|
|
PERSISTENT_STORAGE_BASE = "/data" |
|
|
DATA_SUBDIR_NAME = "my_user_study_persistent_history" |
|
|
|
|
|
if not os.path.exists(PERSISTENT_STORAGE_BASE): |
|
|
try: |
|
|
os.makedirs(PERSISTENT_STORAGE_BASE, exist_ok=True) |
|
|
print(f"信息:基础持久化目录 '{PERSISTENT_STORAGE_BASE}' 尝试确保其存在。") |
|
|
except Exception as e: |
|
|
print(f"警告:操作基础持久化目录 '{PERSISTENT_STORAGE_BASE}' 时出现问题: {e}。") |
|
|
|
|
|
full_subdir_path = os.path.join(PERSISTENT_STORAGE_BASE, DATA_SUBDIR_NAME) |
|
|
if not os.path.exists(full_subdir_path): |
|
|
try: |
|
|
os.makedirs(full_subdir_path) |
|
|
print(f"成功创建持久化子目录: {full_subdir_path}") |
|
|
except Exception as e: |
|
|
print(f"错误:创建持久化子目录 '{full_subdir_path}' 失败: {e}") |
|
|
else: |
|
|
print(f"信息:持久化子目录 '{full_subdir_path}' 已存在。") |
|
|
|
|
|
GLOBAL_HISTORY_FILE = os.path.join(full_subdir_path, "global_experiment_shown_pairs.json") |
|
|
if not (os.path.isdir(full_subdir_path) and os.access(full_subdir_path, os.W_OK)): |
|
|
print(f"严重警告:持久化子目录 '{full_subdir_path}' 无效或不可写。") |
|
|
print(f"全局历史文件将被加载/保存到: {GLOBAL_HISTORY_FILE}") |
|
|
|
|
|
global_shown_pairs_cache = {} |
|
|
global_history_has_unsaved_changes = False |
|
|
exhausted_target_images = set() |
|
|
|
|
|
def load_global_shown_pairs(): |
|
|
global global_shown_pairs_cache, global_history_has_unsaved_changes, exhausted_target_images |
|
|
exhausted_target_images = set() |
|
|
|
|
|
if not GLOBAL_HISTORY_FILE or not os.path.exists(GLOBAL_HISTORY_FILE): |
|
|
print(f"信息:全局历史文件 '{GLOBAL_HISTORY_FILE}' 未找到或路径无效。将创建新的空历史记录。") |
|
|
global_shown_pairs_cache = {} |
|
|
global_history_has_unsaved_changes = False |
|
|
return |
|
|
|
|
|
try: |
|
|
with open(GLOBAL_HISTORY_FILE, 'r', encoding='utf-8') as f: |
|
|
content = f.read() |
|
|
if not content.strip(): |
|
|
print(f"信息:全局历史文件 '{GLOBAL_HISTORY_FILE}' 为空。将使用空历史记录。") |
|
|
global_shown_pairs_cache = {} |
|
|
else: |
|
|
data_from_file = json.loads(content) |
|
|
global_shown_pairs_cache = { |
|
|
target_img: {frozenset(pair) for pair in pairs_list} |
|
|
for target_img, pairs_list in data_from_file.items() |
|
|
} |
|
|
print(f"已成功从 '{GLOBAL_HISTORY_FILE}' 加载全局已展示图片对历史。") |
|
|
except json.JSONDecodeError as jde: |
|
|
print(f"错误:加载全局历史文件 '{GLOBAL_HISTORY_FILE}' 失败 (JSON解析错误: {jde})。文件内容可能已损坏。将使用空历史记录。") |
|
|
global_shown_pairs_cache = {} |
|
|
except Exception as e: |
|
|
print(f"错误:加载全局历史文件 '{GLOBAL_HISTORY_FILE}' 时发生其他错误: {e}。将使用空历史记录。") |
|
|
global_shown_pairs_cache = {} |
|
|
global_history_has_unsaved_changes = False |
|
|
|
|
|
def save_global_shown_pairs(): |
|
|
global global_shown_pairs_cache, global_history_has_unsaved_changes |
|
|
if not GLOBAL_HISTORY_FILE: |
|
|
print("错误:GLOBAL_HISTORY_FILE 未定义。无法保存历史。") |
|
|
return False |
|
|
final_save_path = os.path.abspath(GLOBAL_HISTORY_FILE) |
|
|
try: |
|
|
parent_dir = os.path.dirname(final_save_path) |
|
|
if not os.path.exists(parent_dir): |
|
|
try: |
|
|
os.makedirs(parent_dir, exist_ok=True) |
|
|
print(f"信息: 为保存历史文件,创建了父目录 {parent_dir}") |
|
|
except Exception as e_mkdir: |
|
|
print(f"错误: 创建历史文件的父目录 {parent_dir} 失败: {e_mkdir}。保存可能失败。") |
|
|
return False |
|
|
data_to_save = { |
|
|
target_img: [sorted(list(pair_fset)) for pair_fset in pairs_set] |
|
|
for target_img, pairs_set in global_shown_pairs_cache.items() |
|
|
} |
|
|
|
|
|
temp_file_path = final_save_path + ".tmp" |
|
|
with open(temp_file_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(data_to_save, f, ensure_ascii=False, indent=2) |
|
|
os.replace(temp_file_path, final_save_path) |
|
|
print(f"已成功将全局已展示图片对历史保存到 '{final_save_path}'。") |
|
|
global_history_has_unsaved_changes = False |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"错误:保存全局历史文件 '{final_save_path}' 失败: {e}") |
|
|
return False |
|
|
|
|
|
load_global_shown_pairs() |
|
|
|
|
|
|
|
|
master_image_list = [] |
|
|
if os.path.exists(TARGET_DIR): |
|
|
try: |
|
|
master_image_list = sorted( |
|
|
[f for f in os.listdir(TARGET_DIR) if f.lower().endswith((".jpg", ".png", ".jpeg"))], |
|
|
key=lambda x: int(os.path.splitext(x)[0]) |
|
|
) |
|
|
except ValueError: |
|
|
master_image_list = sorted([f for f in os.listdir(TARGET_DIR) if f.lower().endswith((".jpg", ".png", ".jpeg"))]) |
|
|
if master_image_list: print(f"警告: '{TARGET_DIR}' 文件名非纯数字,按字母排序。") |
|
|
if not master_image_list: print(f"警告:在 '{TARGET_DIR}' 中无有效图片。") |
|
|
elif not os.path.exists(TARGET_DIR) and os.path.exists(BASE_IMAGE_DIR): print(f"错误:目标目录 '{TARGET_DIR}' 未找到。") |
|
|
|
|
|
|
|
|
if REPEAT_SINGLE_TARGET_FOR_TESTING: |
|
|
if not master_image_list: |
|
|
print(f"测试模式错误:master_image_list 为空,无法进行重复单一目标图测试。") |
|
|
else: |
|
|
original_first_image = master_image_list[0] |
|
|
master_image_list = [original_first_image] |
|
|
print(f"测试模式:master_image_list 已被缩减为原列表的第一个图像: {master_image_list}") |
|
|
if not master_image_list: |
|
|
print(f"关键错误:无目标图片可用 (master_image_list为空)。实验无法进行。") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_next_trial_info(current_trial_idx_in_run, current_run_image_list_for_trial, num_trials_in_this_run_for_trial): |
|
|
global TARGET_DIR, METHOD_ROOTS, SUBJECTS, SENTINEL_TRIAL_INTERVAL |
|
|
global global_shown_pairs_cache, global_history_has_unsaved_changes, exhausted_target_images |
|
|
|
|
|
if not current_run_image_list_for_trial or current_trial_idx_in_run >= num_trials_in_this_run_for_trial: |
|
|
return None, current_trial_idx_in_run |
|
|
|
|
|
img_filename_original = current_run_image_list_for_trial[current_trial_idx_in_run] |
|
|
target_full_path = os.path.join(TARGET_DIR, img_filename_original) |
|
|
trial_number_for_display = current_trial_idx_in_run + 1 |
|
|
|
|
|
|
|
|
pool_image_flited = [] |
|
|
pool_reconed_image_color = [] |
|
|
|
|
|
|
|
|
combined_pool_for_sentinel = [] |
|
|
|
|
|
for m_root_path in METHOD_ROOTS: |
|
|
method_name = os.path.basename(m_root_path) |
|
|
|
|
|
subjects_for_method = SUBJECTS |
|
|
if method_name.lower() == "takagi": |
|
|
if "subj01" in SUBJECTS: |
|
|
subjects_for_method = ["subj01"] |
|
|
else: |
|
|
continue |
|
|
|
|
|
for s_id in subjects_for_method: |
|
|
base, ext = os.path.splitext(img_filename_original) |
|
|
reconstructed_filename = f"{base}_0{ext}" |
|
|
candidate_path = os.path.join(m_root_path, s_id, reconstructed_filename) |
|
|
if os.path.exists(candidate_path): |
|
|
internal_label = f"{method_name}/{s_id}/{reconstructed_filename}" |
|
|
candidate_tuple = (internal_label, candidate_path) |
|
|
|
|
|
|
|
|
if method_name == "image_flited": |
|
|
pool_image_flited.append(candidate_tuple) |
|
|
elif method_name == "generated_images_color": |
|
|
pool_reconed_image_color.append(candidate_tuple) |
|
|
|
|
|
|
|
|
combined_pool_for_sentinel.append(candidate_tuple) |
|
|
|
|
|
|
|
|
trial_info = {"image_id": img_filename_original, "target_path": target_full_path, "cur_no": trial_number_for_display, "is_sentinel": False, |
|
|
"left_display_label": "N/A", "left_internal_label": "N/A", "left_path": None, |
|
|
"right_display_label": "N/A", "right_internal_label": "N/A", "right_path": None} |
|
|
|
|
|
is_potential_sentinel_trial = (trial_number_for_display > 0 and trial_number_for_display % SENTINEL_TRIAL_INTERVAL == 0) |
|
|
|
|
|
if is_potential_sentinel_trial: |
|
|
|
|
|
if not combined_pool_for_sentinel: |
|
|
print(f"警告:哨兵图 '{img_filename_original}' (trial {trial_number_for_display}) 无任何候选图。") |
|
|
else: |
|
|
print(f"生成哨兵试验 for '{img_filename_original}' (trial {trial_number_for_display})") |
|
|
trial_info["is_sentinel"] = True |
|
|
sentinel_candidate_target_tuple = ("目标图像", target_full_path) |
|
|
random_reconstruction_candidate_tuple = random.choice(combined_pool_for_sentinel) |
|
|
candidates_for_sentinel = [ |
|
|
(("目标图像", target_full_path), sentinel_candidate_target_tuple[0]), |
|
|
(("重建图", random_reconstruction_candidate_tuple[1]), random_reconstruction_candidate_tuple[0]) |
|
|
] |
|
|
random.shuffle(candidates_for_sentinel) |
|
|
trial_info.update({ |
|
|
"left_display_label": candidates_for_sentinel[0][0][0], "left_path": candidates_for_sentinel[0][0][1], "left_internal_label": candidates_for_sentinel[0][1], |
|
|
"right_display_label": candidates_for_sentinel[1][0][0], "right_path": candidates_for_sentinel[1][0][1], "right_internal_label": candidates_for_sentinel[1][1], |
|
|
}) |
|
|
else: |
|
|
|
|
|
|
|
|
if not pool_image_flited or not pool_reconed_image_color: |
|
|
print(f"警告:常规图 '{img_filename_original}' (trial {trial_number_for_display}) 候选不足以形成 'image_flited' vs 'reconed_image_color' 对。 " |
|
|
f"('image_flited' 找到 {len(pool_image_flited)} 个, " |
|
|
f"'reconed_image_color' 找到 {len(pool_reconed_image_color)} 个)。此试验无法进行。") |
|
|
return None, current_trial_idx_in_run |
|
|
|
|
|
target_global_history_set = global_shown_pairs_cache.setdefault(img_filename_original, set()) |
|
|
|
|
|
|
|
|
all_possible_pairs_in_pool = [] |
|
|
for c_flited, c_reconed in product(pool_image_flited, pool_reconed_image_color): |
|
|
pair_labels_fset = frozenset({c_flited[0], c_reconed[0]}) |
|
|
all_possible_pairs_in_pool.append( ((c_flited, c_reconed), pair_labels_fset) ) |
|
|
|
|
|
|
|
|
unseen_globally_pairs_with_data = [ |
|
|
item for item in all_possible_pairs_in_pool if item[1] not in target_global_history_set |
|
|
] |
|
|
selected_candidates_tuples = None |
|
|
|
|
|
if unseen_globally_pairs_with_data: |
|
|
chosen_pair_data_and_labels = random.choice(unseen_globally_pairs_with_data) |
|
|
selected_candidates_tuples = chosen_pair_data_and_labels[0] |
|
|
chosen_pair_frozenset = chosen_pair_data_and_labels[1] |
|
|
target_global_history_set.add(chosen_pair_frozenset) |
|
|
global_history_has_unsaved_changes = True |
|
|
else: |
|
|
print(f"警告:目标图 '{img_filename_original}' (trial {trial_number_for_display}): 所有 ({len(all_possible_pairs_in_pool)}) 个 'image_flited' vs 'reconed_image_color' 对均已在全局展示过。") |
|
|
if all_possible_pairs_in_pool: |
|
|
print(f"目标图 '{img_filename_original}' 将被标记为已耗尽,未来轮次中将被跳过。") |
|
|
exhausted_target_images.add(img_filename_original) |
|
|
return None, current_trial_idx_in_run |
|
|
|
|
|
display_order_candidates = list(selected_candidates_tuples) |
|
|
if random.random() > 0.5: |
|
|
display_order_candidates = display_order_candidates[::-1] |
|
|
trial_info.update({ |
|
|
"left_display_label": "候选图 1", "left_path": display_order_candidates[0][1], "left_internal_label": display_order_candidates[0][0], |
|
|
"right_display_label": "候选图 2", "right_path": display_order_candidates[1][1], "right_internal_label": display_order_candidates[1][0], |
|
|
}) |
|
|
return trial_info, current_trial_idx_in_run + 1 |
|
|
|
|
|
|
|
|
|
|
|
def save_single_log_to_hf_dataset(log_entry, user_identifier_str): |
|
|
global DATASET_REPO_ID, INDIVIDUAL_LOGS_FOLDER |
|
|
if not isinstance(log_entry, dict): |
|
|
print(f"错误:单个日志条目不是字典格式,无法保存:{log_entry}") |
|
|
return False |
|
|
current_user_id = user_identifier_str if user_identifier_str else "unknown_user_session" |
|
|
identifier_safe = str(current_user_id).replace('.', '_').replace(':', '_').replace('/', '_') |
|
|
print(f"用户 {identifier_safe} - 准备保存单条日志 for image {log_entry.get('image_id', 'Unknown')}...") |
|
|
try: |
|
|
token = os.getenv("HF_TOKEN") |
|
|
if not token: |
|
|
print("错误:环境变量 HF_TOKEN 未设置。无法保存单条日志到Dataset。") |
|
|
return False |
|
|
if not DATASET_REPO_ID: |
|
|
print("错误:DATASET_REPO_ID 未配置。无法保存单条日志到Dataset。") |
|
|
return False |
|
|
api = HfApi(token=token) |
|
|
image_id_safe_for_filename = os.path.splitext(log_entry.get("image_id", "unknown_img"))[0].replace('.', '_').replace(':', '_').replace('/', '_') |
|
|
file_creation_timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S_%f') |
|
|
unique_filename = (f"run{log_entry.get('run_no', 'X')}_trial{log_entry.get('trial_sequence_in_run', 'Y')}_img{image_id_safe_for_filename}_{file_creation_timestamp_str}.json") |
|
|
path_in_repo = f"{INDIVIDUAL_LOGS_FOLDER}/{identifier_safe}/{unique_filename}" |
|
|
|
|
|
try: |
|
|
json_content = json.dumps(log_entry, ensure_ascii=False, indent=2) |
|
|
except Exception as json_err: |
|
|
print(f"错误:序列化单条日志时出错: {log_entry}. 错误: {json_err}") |
|
|
error_log_content = {"error": "serialization_failed_single", "original_data_keys": list(log_entry.keys()) if isinstance(log_entry, dict) else None, "timestamp": datetime.now().isoformat()} |
|
|
json_content = json.dumps(error_log_content, ensure_ascii=False, indent=2) |
|
|
|
|
|
log_bytes = json_content.encode('utf-8') |
|
|
file_like_object = io.BytesIO(log_bytes) |
|
|
print(f"准备上传单条日志文件: {path_in_repo} ({len(log_bytes)} bytes)") |
|
|
api.upload_file( |
|
|
path_or_fileobj=file_like_object, |
|
|
path_in_repo=path_in_repo, |
|
|
repo_id=DATASET_REPO_ID, |
|
|
repo_type="dataset", |
|
|
commit_message=(f"Log choice: img {log_entry.get('image_id', 'N/A')}, run {log_entry.get('run_no', 'N/A')}, trial {log_entry.get('trial_sequence_in_run', 'N/A')} by {identifier_safe}") |
|
|
) |
|
|
print(f"单条日志已成功保存到 HF Dataset: {DATASET_REPO_ID}/{path_in_repo}") |
|
|
return True |
|
|
except HfHubHTTPError as hf_http_error: |
|
|
print(f"保存单条日志到 Hugging Face Dataset 时发生 HTTP 错误 (可能被限流或权限问题): {hf_http_error}") |
|
|
traceback.print_exc() |
|
|
return False |
|
|
except Exception as e: |
|
|
print(f"保存单条日志 (image {log_entry.get('image_id', 'Unknown')}, user {identifier_safe}) 到 Hugging Face Dataset 时发生严重错误: {e}") |
|
|
traceback.print_exc() |
|
|
return False |
|
|
|
|
|
|
|
|
def save_collected_logs_batch(list_of_log_entries, user_identifier_str, batch_identifier): |
|
|
global DATASET_REPO_ID, BATCH_LOG_FOLDER |
|
|
if not list_of_log_entries: |
|
|
print("批量保存用户日志:没有累积的日志。") |
|
|
return True |
|
|
identifier_safe = str(user_identifier_str if user_identifier_str else "unknown_user_session").replace('.', '_').replace(':', '_').replace('/', '_').replace(' ', '_') |
|
|
print(f"用户 {identifier_safe} - 准备批量保存 {len(list_of_log_entries)} 条选择日志 (批次标识: {batch_identifier})...") |
|
|
try: |
|
|
token = os.getenv("HF_TOKEN") |
|
|
if not token: |
|
|
print("错误:HF_TOKEN 未设置。无法批量保存选择日志。") |
|
|
return False |
|
|
if not DATASET_REPO_ID: |
|
|
print("错误:DATASET_REPO_ID 未配置。无法批量保存选择日志。") |
|
|
return False |
|
|
api = HfApi(token=token) |
|
|
timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S_%f') |
|
|
batch_filename = f"batch_user-{identifier_safe}_id-{batch_identifier}_{timestamp_str}_logs-{len(list_of_log_entries)}.jsonl" |
|
|
path_in_repo = f"{BATCH_LOG_FOLDER}/{identifier_safe}/{batch_filename}" |
|
|
jsonl_content = "" |
|
|
for log_entry in list_of_log_entries: |
|
|
try: |
|
|
if isinstance(log_entry, dict): jsonl_content += json.dumps(log_entry, ensure_ascii=False) + "\n" |
|
|
else: print(f"警告:批量保存选择日志时,条目非字典:{log_entry}") |
|
|
except Exception as json_err: |
|
|
print(f"错误:批量保存选择日志序列化单条时出错: {log_entry}. 错误: {json_err}") |
|
|
jsonl_content += json.dumps({"error": "serialization_failed_in_batch_user_log", "original_data_preview": str(log_entry)[:100],"timestamp": datetime.now().isoformat()}, ensure_ascii=False) + "\n" |
|
|
|
|
|
if not jsonl_content.strip(): |
|
|
print(f"用户 {identifier_safe} (批次 {batch_identifier}) 无可序列化选择日志。") |
|
|
return True |
|
|
|
|
|
log_bytes = jsonl_content.encode('utf-8') |
|
|
file_like_object = io.BytesIO(log_bytes) |
|
|
print(f"准备批量上传选择日志文件: {path_in_repo} ({len(log_bytes)} bytes)") |
|
|
api.upload_file( |
|
|
path_or_fileobj=file_like_object, |
|
|
path_in_repo=path_in_repo, |
|
|
repo_id=DATASET_REPO_ID, |
|
|
repo_type="dataset", |
|
|
commit_message=f"Batch user choice logs for {identifier_safe}, batch_id {batch_identifier} ({len(list_of_log_entries)} entries)" |
|
|
) |
|
|
print(f"批量选择日志已成功保存到 HF Dataset: {DATASET_REPO_ID}/{path_in_repo}") |
|
|
return True |
|
|
except HfHubHTTPError as hf_http_error: |
|
|
print(f"批量保存选择日志到 Hugging Face Dataset 时发生 HTTP 错误 (可能被限流或权限问题): {hf_http_error}") |
|
|
traceback.print_exc() |
|
|
return False |
|
|
except Exception as e: |
|
|
print(f"批量保存选择日志 (user {identifier_safe}, batch_id {batch_identifier}) 失败: {e}") |
|
|
traceback.print_exc() |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
def process_experiment_step( |
|
|
s_trial_idx_val, s_run_no_val, s_user_logs_val, s_current_trial_data_val, s_user_session_id_val, |
|
|
s_current_run_image_list_val, s_num_trials_this_run_val, |
|
|
action_type=None, choice_value=None, request: gr.Request = None |
|
|
): |
|
|
global master_image_list, NUM_TRIALS_PER_RUN, outputs_ui_components_definition, LOG_BATCH_SIZE |
|
|
global REPEAT_SINGLE_TARGET_FOR_TESTING, NUM_REPEATED_TRIALS_FOR_TESTING |
|
|
global exhausted_target_images, global_history_has_unsaved_changes |
|
|
|
|
|
output_s_trial_idx = s_trial_idx_val; output_s_run_no = s_run_no_val |
|
|
output_s_user_logs = list(s_user_logs_val); output_s_current_trial_data = dict(s_current_trial_data_val) if s_current_trial_data_val else {} |
|
|
output_s_user_session_id = s_user_session_id_val; output_s_current_run_image_list = list(s_current_run_image_list_val) |
|
|
output_s_num_trials_this_run = s_num_trials_this_run_val |
|
|
user_ip_fallback = request.client.host if request else "unknown_ip" |
|
|
user_identifier_for_logging = output_s_user_session_id if output_s_user_session_id else user_ip_fallback |
|
|
|
|
|
len_ui_outputs = len(outputs_ui_components_definition) |
|
|
|
|
|
def create_ui_error_tuple(message, progress_msg_text, stop_experiment=False): |
|
|
btn_start_interactive = not stop_experiment |
|
|
btn_choices_interactive = not stop_experiment |
|
|
return (gr.update(visible=False),) * 3 + \ |
|
|
("", "") + \ |
|
|
(message, progress_msg_text) + \ |
|
|
(gr.update(interactive=btn_start_interactive), gr.update(interactive=btn_choices_interactive), gr.update(interactive=btn_choices_interactive)) + \ |
|
|
(gr.update(visible=False),) |
|
|
|
|
|
def create_no_change_tuple(): return (gr.update(),) * len_ui_outputs |
|
|
user_id_display_text = output_s_user_session_id if output_s_user_session_id else "用户ID待分配" |
|
|
|
|
|
if action_type == "record_choice": |
|
|
if output_s_current_trial_data.get("data") and output_s_current_trial_data["data"].get("left_internal_label"): |
|
|
chosen_internal_label = (output_s_current_trial_data["data"]["left_internal_label"] if choice_value == "left" else output_s_current_trial_data["data"]["right_internal_label"]) |
|
|
parsed_chosen_method, parsed_chosen_subject, parsed_chosen_filename = "N/A", "N/A", "N/A" |
|
|
if chosen_internal_label == "目标图像": parsed_chosen_method, parsed_chosen_subject, parsed_chosen_filename = "TARGET", "GT", output_s_current_trial_data["data"]["image_id"] |
|
|
else: |
|
|
parts = chosen_internal_label.split('/'); |
|
|
if len(parts) == 3: parsed_chosen_method, parsed_chosen_subject, parsed_chosen_filename = parts[0].strip(), parts[1].strip(), parts[2].strip() |
|
|
elif len(parts) == 2: parsed_chosen_method, parsed_chosen_subject = parts[0].strip(), parts[1].strip() |
|
|
elif len(parts) == 1: parsed_chosen_method = parts[0].strip() |
|
|
log_entry = { |
|
|
"timestamp": datetime.now().isoformat(), "user_identifier": user_identifier_for_logging, "run_no": output_s_run_no, |
|
|
"image_id": output_s_current_trial_data["data"]["image_id"], |
|
|
"left_internal_label": output_s_current_trial_data["data"]["left_internal_label"], |
|
|
"right_internal_label": output_s_current_trial_data["data"]["right_internal_label"], |
|
|
"chosen_side": choice_value, "chosen_internal_label": chosen_internal_label, |
|
|
"chosen_method": parsed_chosen_method, "chosen_subject": parsed_chosen_subject, "chosen_filename": parsed_chosen_filename, |
|
|
"trial_sequence_in_run": output_s_current_trial_data["data"]["cur_no"], |
|
|
"is_sentinel": output_s_current_trial_data["data"]["is_sentinel"] |
|
|
} |
|
|
output_s_user_logs.append(log_entry) |
|
|
print(f"用户 {user_identifier_for_logging} 记录选择 (img: {log_entry['image_id']})。当前批次日志数: {len(output_s_user_logs)}") |
|
|
|
|
|
if len(output_s_user_logs) >= LOG_BATCH_SIZE: |
|
|
print(f"累积用户选择日志达到 {LOG_BATCH_SIZE} 条,准备批量保存...") |
|
|
batch_id_for_filename = f"run{output_s_run_no}_trialidx{output_s_trial_idx}_logcount{len(output_s_user_logs)}" |
|
|
user_logs_save_success = save_collected_logs_batch(list(output_s_user_logs), user_identifier_for_logging, batch_id_for_filename) |
|
|
if user_logs_save_success: |
|
|
print("批量用户选择日志已成功(或尝试)保存,将清空累积的用户选择日志列表。") |
|
|
output_s_user_logs = [] |
|
|
else: |
|
|
print("严重错误:批量用户选择日志保存失败。实验无法继续。") |
|
|
error_message_ui = "错误:日志保存失败,可能是网络问题或API限流。实验已停止,请联系管理员。" |
|
|
progress_message_ui = f"用户ID: {user_id_display_text} | 实验因错误停止在第 {output_s_run_no} 轮,试验 {output_s_trial_idx+1}" |
|
|
error_ui_updates = create_ui_error_tuple(error_message_ui, progress_message_ui, stop_experiment=True) |
|
|
return output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, \ |
|
|
output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *error_ui_updates |
|
|
|
|
|
if global_history_has_unsaved_changes: |
|
|
print("检测到全局图片对历史自上次保存后有更新,将一并保存...") |
|
|
if not save_global_shown_pairs(): |
|
|
print("严重错误:全局图片对历史保存失败。实验无法继续。") |
|
|
error_message_ui = "错误:全局历史数据保存失败。实验已停止,请联系管理员。" |
|
|
progress_message_ui = f"用户ID: {user_id_display_text} | 实验因错误停止在第 {output_s_run_no} 轮,试验 {output_s_trial_idx+1}" |
|
|
error_ui_updates = create_ui_error_tuple(error_message_ui, progress_message_ui, stop_experiment=True) |
|
|
return output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, \ |
|
|
output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *error_ui_updates |
|
|
else: |
|
|
print(f"用户 {user_identifier_for_logging} 错误:记录选择时当前试验数据为空或缺少internal_label!") |
|
|
error_ui_updates = create_ui_error_tuple("记录选择时内部错误。", f"用户ID: {user_id_display_text} | 进度:{output_s_trial_idx}/{output_s_num_trials_this_run}", stop_experiment=False) |
|
|
return output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *error_ui_updates |
|
|
|
|
|
if action_type == "start_experiment": |
|
|
is_first = (output_s_num_trials_this_run == 0 and output_s_trial_idx == 0 and output_s_run_no == 1) |
|
|
is_completed_for_restart = (output_s_num_trials_this_run > 0 and output_s_trial_idx >= output_s_num_trials_this_run) |
|
|
|
|
|
if is_completed_for_restart: |
|
|
if output_s_user_logs: |
|
|
print(f"轮次 {output_s_run_no-1} 结束,尝试保存剩余的 {len(output_s_user_logs)} 条用户选择日志...") |
|
|
batch_id_for_filename = f"run{output_s_run_no-1}_final_logcount{len(output_s_user_logs)}" |
|
|
if not save_collected_logs_batch(list(output_s_user_logs), user_identifier_for_logging, batch_id_for_filename): |
|
|
print("严重错误:保存上一轮剩余用户选择日志失败。实验无法继续。") |
|
|
error_message_ui = "错误:日志保存失败。实验已停止,请联系管理员。" |
|
|
progress_message_ui = f"用户ID: {user_id_display_text} | 实验因错误停止" |
|
|
error_ui_updates = create_ui_error_tuple(error_message_ui, progress_message_ui, stop_experiment=True) |
|
|
return output_s_trial_idx, output_s_run_no-1, output_s_user_logs, output_s_current_trial_data, \ |
|
|
output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *error_ui_updates |
|
|
output_s_user_logs = [] |
|
|
|
|
|
if global_history_has_unsaved_changes: |
|
|
print("轮次结束,尝试保存全局图片对历史...") |
|
|
if not save_global_shown_pairs(): |
|
|
print("严重错误:全局历史数据保存失败。实验无法继续。") |
|
|
error_message_ui = "错误:全局历史数据保存失败。实验已停止,请联系管理员。" |
|
|
progress_message_ui = f"用户ID: {user_id_display_text} | 实验因错误停止" |
|
|
error_ui_updates = create_ui_error_tuple(error_message_ui, progress_message_ui, stop_experiment=True) |
|
|
return output_s_trial_idx, output_s_run_no-1, output_s_user_logs, output_s_current_trial_data, \ |
|
|
output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *error_ui_updates |
|
|
|
|
|
if is_first or is_completed_for_restart: |
|
|
if is_completed_for_restart: output_s_run_no += 1 |
|
|
available_master_images = [img for img in master_image_list if img not in exhausted_target_images] |
|
|
print(f"开始轮次 {output_s_run_no}: 从 {len(master_image_list)}个总目标图片中筛选,可用图片 {len(available_master_images)}个 (已排除 {len(exhausted_target_images)}个已耗尽图片).") |
|
|
if not available_master_images: |
|
|
msg = "所有目标图片的所有唯一图片对均已展示完毕!感谢您的参与。" |
|
|
prog_text = f"用户ID: {user_id_display_text} | 实验完成!" |
|
|
if output_s_user_logs: |
|
|
print(f"最终轮次结束,尝试保存剩余的 {len(output_s_user_logs)} 条用户选择日志...") |
|
|
batch_id_for_filename = f"run{output_s_run_no-1}_final_logcount{len(output_s_user_logs)}" |
|
|
save_collected_logs_batch(list(output_s_user_logs), user_identifier_for_logging, batch_id_for_filename) |
|
|
output_s_user_logs = [] |
|
|
if global_history_has_unsaved_changes: |
|
|
print("实验最终结束,尝试保存全局图片对历史...") |
|
|
save_global_shown_pairs() |
|
|
|
|
|
ui_updates = list(create_ui_error_tuple(msg, prog_text, stop_experiment=True)) |
|
|
return 0, output_s_run_no, [], {}, output_s_user_session_id, [], 0, *tuple(ui_updates) |
|
|
|
|
|
if REPEAT_SINGLE_TARGET_FOR_TESTING and available_master_images: |
|
|
print(f"测试模式 (重复单一目标图) 已激活。") |
|
|
single_image_to_repeat = available_master_images[0] |
|
|
output_s_current_run_image_list = [single_image_to_repeat] * NUM_REPEATED_TRIALS_FOR_TESTING |
|
|
output_s_num_trials_this_run = NUM_REPEATED_TRIALS_FOR_TESTING |
|
|
print(f"测试模式:本轮将重复目标图片 '{single_image_to_repeat}' 共 {output_s_num_trials_this_run} 次。") |
|
|
else: |
|
|
num_really_avail = len(available_master_images) |
|
|
current_run_max_trials = NUM_TRIALS_PER_RUN |
|
|
run_size = min(num_really_avail, current_run_max_trials) |
|
|
if run_size == 0: |
|
|
error_ui = create_ui_error_tuple("错误: 可用图片采样数为0!", f"用户ID: {user_id_display_text} | 进度: 0/0", stop_experiment=False) |
|
|
return 0, output_s_run_no, output_s_user_logs, {}, output_s_user_session_id, [], 0, *error_ui |
|
|
output_s_current_run_image_list = random.sample(available_master_images, run_size) |
|
|
output_s_num_trials_this_run = run_size |
|
|
|
|
|
output_s_trial_idx = 0 |
|
|
output_s_current_trial_data = {} |
|
|
if is_first: |
|
|
timestamp_str = datetime.now().strftime('%Y%m%d%H%M%S%f'); random_val = random.randint(10000, 99999) |
|
|
if not output_s_user_session_id: |
|
|
output_s_user_session_id = f"user_{timestamp_str}_{random_val}"; user_identifier_for_logging = output_s_user_session_id |
|
|
else: |
|
|
user_identifier_for_logging = output_s_user_session_id |
|
|
print(f"用户会话ID: {output_s_user_session_id}") |
|
|
print(f"开始/继续轮次 {output_s_run_no} (用户ID: {output_s_user_session_id}). 本轮共 {output_s_num_trials_this_run} 个试验。") |
|
|
else: |
|
|
print(f"用户 {user_identifier_for_logging} 在第 {output_s_run_no} 轮,试验 {output_s_trial_idx} 点击开始,但轮次未完成。忽略。") |
|
|
no_change_ui = create_no_change_tuple() |
|
|
return output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *no_change_ui |
|
|
|
|
|
current_actual_trial_index_for_get_next = output_s_trial_idx |
|
|
|
|
|
if current_actual_trial_index_for_get_next >= output_s_num_trials_this_run and output_s_num_trials_this_run > 0: |
|
|
prog_text = f"用户ID: {output_s_user_session_id} | 进度:{output_s_num_trials_this_run}/{output_s_num_trials_this_run} | 第 {output_s_run_no} 轮 🎉" |
|
|
ui_updates = list(create_ui_error_tuple(f"🎉 第 {output_s_run_no} 轮完成!请点击“开始试验 / 下一轮”继续。", prog_text, stop_experiment=False)) |
|
|
ui_updates[7]=gr.update(interactive=True); ui_updates[8]=gr.update(interactive=False); ui_updates[9]=gr.update(interactive=False) |
|
|
ui_updates[0]=gr.update(value=None,visible=False); ui_updates[1]=gr.update(value=None,visible=False); ui_updates[2]=gr.update(value=None,visible=False) |
|
|
yield output_s_trial_idx, output_s_run_no, output_s_user_logs, {"data": None}, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *ui_updates; return |
|
|
|
|
|
|
|
|
if not output_s_current_run_image_list or output_s_num_trials_this_run == 0: |
|
|
error_ui = create_ui_error_tuple("错误: 无法加载试验图片 (列表为空或试验数为0)", f"用户ID: {user_id_display_text} | 进度: N/A", stop_experiment=False) |
|
|
return output_s_trial_idx, output_s_run_no, output_s_user_logs, {"data": None}, output_s_user_session_id, [], 0, *error_ui |
|
|
|
|
|
trial_info = None |
|
|
next_s_trial_idx_for_state_loop = current_actual_trial_index_for_get_next |
|
|
|
|
|
while next_s_trial_idx_for_state_loop < output_s_num_trials_this_run: |
|
|
current_target_image_for_trial = output_s_current_run_image_list[next_s_trial_idx_for_state_loop] |
|
|
if current_target_image_for_trial in exhausted_target_images: |
|
|
print(f"信息:目标图 '{current_target_image_for_trial}' 已在全局耗尽列表中,跳过此试验。") |
|
|
next_s_trial_idx_for_state_loop += 1 |
|
|
output_s_trial_idx = next_s_trial_idx_for_state_loop |
|
|
continue |
|
|
|
|
|
_trial_info_candidate, _returned_next_idx = get_next_trial_info(next_s_trial_idx_for_state_loop, output_s_current_run_image_list, output_s_num_trials_this_run) |
|
|
|
|
|
if _trial_info_candidate is not None: |
|
|
trial_info = _trial_info_candidate |
|
|
output_s_trial_idx = _returned_next_idx |
|
|
break |
|
|
else: |
|
|
print(f"信息:目标图 '{current_target_image_for_trial}' 无法生成有效试验。尝试列表中的下一个。") |
|
|
next_s_trial_idx_for_state_loop +=1 |
|
|
output_s_trial_idx = next_s_trial_idx_for_state_loop |
|
|
|
|
|
if trial_info is None: |
|
|
print(f"轮次 {output_s_run_no} 中没有更多可用的有效试验了。结束本轮。") |
|
|
if output_s_user_logs: |
|
|
print(f"轮次 {output_s_run_no} 无更多有效试验,尝试保存剩余 {len(output_s_user_logs)} 条日志...") |
|
|
batch_id_for_filename = f"run{output_s_run_no}_no_more_trials_logcount{len(output_s_user_logs)}" |
|
|
if not save_collected_logs_batch(list(output_s_user_logs), user_identifier_for_logging, batch_id_for_filename): |
|
|
print("严重错误:保存剩余日志失败。实验可能需要停止。") |
|
|
output_s_user_logs = [] |
|
|
if global_history_has_unsaved_changes: |
|
|
print("轮次无更多有效试验,尝试保存全局图片对历史...") |
|
|
if not save_global_shown_pairs(): |
|
|
print("严重错误:全局历史数据保存失败。实验可能需要停止。") |
|
|
|
|
|
prog_text = f"用户ID: {output_s_user_session_id} | 进度:{output_s_num_trials_this_run}/{output_s_num_trials_this_run} | 第 {output_s_run_no} 轮 (无更多可用试验)" |
|
|
ui_updates = list(create_ui_error_tuple(f"第 {output_s_run_no} 轮因无更多可用试验而结束。请点击“开始试验 / 下一轮”。", prog_text, stop_experiment=False)) |
|
|
ui_updates[7]=gr.update(interactive=True); ui_updates[8]=gr.update(interactive=False); ui_updates[9]=gr.update(interactive=False) |
|
|
ui_updates[0]=gr.update(value=None,visible=False); ui_updates[1]=gr.update(value=None,visible=False); ui_updates[2]=gr.update(value=None,visible=False) |
|
|
yield output_s_num_trials_this_run, output_s_run_no, output_s_user_logs, {"data": None}, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *ui_updates; return |
|
|
|
|
|
output_s_current_trial_data = {"data": trial_info} |
|
|
prog_text = f"用户ID: {output_s_user_session_id} | 进度:{trial_info['cur_no']}/{output_s_num_trials_this_run} | 第 {output_s_run_no} 轮" |
|
|
|
|
|
ui_show_target_updates = list(create_no_change_tuple()) |
|
|
ui_show_target_updates[0]=gr.update(value=trial_info["target_path"],visible=True); ui_show_target_updates[1]=gr.update(value=None,visible=False); ui_show_target_updates[2]=gr.update(value=None,visible=False) |
|
|
ui_show_target_updates[3]=""; ui_show_target_updates[4]=""; ui_show_target_updates[5]="请观察原图…"; ui_show_target_updates[6]=prog_text |
|
|
ui_show_target_updates[7]=gr.update(interactive=False); ui_show_target_updates[8]=gr.update(interactive=False); ui_show_target_updates[9]=gr.update(interactive=False) |
|
|
yield output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *ui_show_target_updates |
|
|
|
|
|
time.sleep(3) |
|
|
|
|
|
ui_show_candidates_updates = list(create_no_change_tuple()) |
|
|
ui_show_candidates_updates[0]=gr.update(value=None,visible=False); ui_show_candidates_updates[1]=gr.update(value=trial_info["left_path"],visible=True); ui_show_candidates_updates[2]=gr.update(value=trial_info["right_path"],visible=True) |
|
|
ui_show_candidates_updates[3]=gr.update(value=trial_info["left_display_label"], visible=True); ui_show_candidates_updates[4]=gr.update(value=trial_info["right_display_label"], visible=True) |
|
|
ui_show_candidates_updates[5]="请选择更像原图的一张"; ui_show_candidates_updates[6]=prog_text |
|
|
ui_show_candidates_updates[7]=gr.update(interactive=False); ui_show_candidates_updates[8]=gr.update(interactive=True); ui_show_candidates_updates[9]=gr.update(interactive=True) |
|
|
yield output_s_trial_idx, output_s_run_no, output_s_user_logs, output_s_current_trial_data, output_s_user_session_id, output_s_current_run_image_list, output_s_num_trials_this_run, *ui_show_candidates_updates |
|
|
|
|
|
|
|
|
def handle_download_history_file(): |
|
|
global GLOBAL_HISTORY_FILE |
|
|
if os.path.exists(GLOBAL_HISTORY_FILE): |
|
|
try: |
|
|
if os.path.getsize(GLOBAL_HISTORY_FILE) > 0: |
|
|
print(f"准备提供文件下载: {GLOBAL_HISTORY_FILE}") |
|
|
return GLOBAL_HISTORY_FILE, gr.update(value=f"点击上面的链接下载 '{os.path.basename(GLOBAL_HISTORY_FILE)}'") |
|
|
else: |
|
|
print(f"历史文件 '{GLOBAL_HISTORY_FILE}' 为空,不提供下载。") |
|
|
return None, gr.update(value=f"提示: 历史文件 '{os.path.basename(GLOBAL_HISTORY_FILE)}' 当前为空。") |
|
|
except Exception as e: |
|
|
print(f"检查历史文件大小时出错 '{GLOBAL_HISTORY_FILE}': {e}") |
|
|
return None, gr.update(value=f"错误: 检查历史文件时出错。") |
|
|
else: |
|
|
print(f"请求下载历史文件,但文件 '{GLOBAL_HISTORY_FILE}' 未找到。") |
|
|
return None, gr.update(value=f"错误: JSON历史文件 '{os.path.basename(GLOBAL_HISTORY_FILE)}' 未找到。请先运行实验以生成数据并触发保存。") |
|
|
|
|
|
welcome_page_markdown = """ |
|
|
## 欢迎加入实验! |
|
|
您好!非常感谢您抽出宝贵时间参与我们的视觉偏好评估实验。您的选择将帮助我们改进重建算法,让机器生成的图像更贴近人类视觉体验! |
|
|
1. **实验目的**:通过比较两幅 重建图像 与原始 目标图像 的相似度。 |
|
|
2. **操作流程**: |
|
|
* 点击下方的「我已阅读并同意开始实验」按钮。 |
|
|
* 然后点击主实验界面的「开始试验 / 下一轮」按钮。 |
|
|
* 系统先展示一张 **目标图像**,持续 3 秒。 |
|
|
* 随后自动切换到 **两张重建图像**。 |
|
|
* 根据刚才的观察记忆,选出您认为与目标图像最相似的一张。 |
|
|
* 选择后系统会自动进入下一轮比较。 |
|
|
3. **温馨提示**: |
|
|
* 请勿刷新或关闭页面,以免中断实验。 |
|
|
* 若图片加载稍有延迟,请耐心等待;持续异常可联系邮箱 [email protected]。 |
|
|
* 本实验将保护您的任何个人隐私信息,所有数据仅用于学术研究,请您认真选择和填写。 |
|
|
4. **奖励说明**: |
|
|
* 完成全部轮次后,请截图记录您所完成的实验总数(可累积,页面左下角将显示进度,请保证截取到为您分配的ID,轮次)。 |
|
|
* 将截图发送至邮箱 [email protected],我们将在核验后发放奖励。 |
|
|
再次感谢您的参与与支持!您每一次认真选择都对我们的研究意义重大。祝您一切顺利,实验愉快! |
|
|
""" |
|
|
def handle_agree_and_start(name, gender, age, education, request: gr.Request): |
|
|
error_messages_list = [] |
|
|
if not name or str(name).strip() == "": error_messages_list.append("姓名 不能为空。") |
|
|
if gender is None or str(gender).strip() == "": error_messages_list.append("性别 必须选择。") |
|
|
if age is None: error_messages_list.append("年龄 不能为空。") |
|
|
elif not (isinstance(age, (int, float)) and 1 <= age <= 120): |
|
|
try: num_age = float(age); |
|
|
except (ValueError, TypeError): error_messages_list.append("年龄必须是一个有效的数字。") |
|
|
else: |
|
|
if not (1 <= num_age <= 120): error_messages_list.append("年龄必须在 1 到 120 之间。") |
|
|
if education is None or str(education).strip() == "其他": error_messages_list.append("学历 必须选择。") |
|
|
if error_messages_list: |
|
|
full_error_message = "请修正以下错误:\n" + "\n".join([f"- {msg}" for msg in error_messages_list]) |
|
|
print(f"用户输入验证失败: {full_error_message}") |
|
|
return gr.update(), False, gr.update(visible=True), gr.update(visible=False), full_error_message |
|
|
s_name = str(name).strip().replace(" ","_").replace("/","_").replace("\\","_") |
|
|
s_gender = str(gender).strip().replace(" ","_").replace("/","_").replace("\\","_") |
|
|
s_age = str(int(float(age))) |
|
|
s_education = str(education).strip().replace(" ","_").replace("/","_").replace("\\","_") |
|
|
user_id_str = f"N-{s_name}_G-{s_gender}_A-{s_age}_E-{s_education}" |
|
|
print(f"用户信息收集完毕,生成用户ID: {user_id_str}") |
|
|
return user_id_str, True, gr.update(visible=False), gr.update(visible=True), "" |
|
|
|
|
|
with gr.Blocks(css=CSS, title="图像重建主观评估") as demo: |
|
|
s_show_experiment_ui = gr.State(False); s_trial_index = gr.State(0); s_run_no = gr.State(1) |
|
|
s_user_logs = gr.State([]); s_current_trial_data = gr.State({}); s_user_session_id = gr.State(None) |
|
|
s_current_run_image_list = gr.State([]); s_num_trials_this_run = gr.State(0) |
|
|
|
|
|
welcome_container = gr.Column(visible=True) |
|
|
experiment_container = gr.Column(visible=False) |
|
|
|
|
|
with welcome_container: |
|
|
gr.Markdown(welcome_page_markdown) |
|
|
with gr.Row(): user_name_input = gr.Textbox(label="请输入您的姓名或代号 (例如 张三 或 User001)", placeholder="例如:张三 -> ZS"); user_gender_input = gr.Radio(label="性别", choices=["男", "女"]) |
|
|
with gr.Row(): user_age_input = gr.Number(label="年龄 (请输入1-120的整数)", minimum=1, maximum=120, step=1); user_education_input = gr.Dropdown(label="学历", choices=["其他","初中及以下","高中(含中专)", "大专(含在读)", "本科(含在读)", "硕士(含在读)", "博士(含在读)"]) |
|
|
welcome_error_msg = gr.Markdown(value="") |
|
|
btn_agree_and_start = gr.Button("我已阅读上述说明并同意参与实验") |
|
|
|
|
|
with experiment_container: |
|
|
gr.Markdown("## 🧠 图像重建主观评估实验"); gr.Markdown(f"每轮实验大约有 {NUM_TRIALS_PER_RUN} 次比较。") |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1, min_width=300): left_img = gr.Image(label="左候选图", visible=False, height=400, interactive=False); left_lbl = gr.Textbox(label="左图信息", value="", visible=True, interactive=False, max_lines=1); btn_left = gr.Button("选择左图 (更相似)", interactive=False, elem_classes="compact_button") |
|
|
with gr.Column(scale=1, min_width=300): right_img = gr.Image(label="右候选图", visible=False, height=400, interactive=False); right_lbl = gr.Textbox(label="右图信息",value="", visible=True, interactive=False, max_lines=1); btn_right = gr.Button("选择右图 (更相似)", interactive=False, elem_classes="compact_button") |
|
|
with gr.Row(): target_img = gr.Image(label="目标图像 (观察3秒后消失)", visible=False, height=400, interactive=False) |
|
|
with gr.Row(): status_text = gr.Markdown(value="请点击“开始试验 / 下一轮”按钮。") |
|
|
with gr.Row(): progress_text = gr.Markdown() |
|
|
with gr.Row(): |
|
|
btn_start = gr.Button("开始试验 / 下一轮") |
|
|
btn_download_json = gr.Button("下载JSON历史记录") |
|
|
json_download_output = gr.File(label="下载的文件会在此处提供", interactive=False) |
|
|
file_out_placeholder = gr.File(label=" ", visible=False, interactive=False) |
|
|
|
|
|
outputs_ui_components_definition = [ |
|
|
target_img, left_img, right_img, left_lbl, right_lbl, status_text, progress_text, |
|
|
btn_start, btn_left, btn_right, file_out_placeholder |
|
|
] |
|
|
click_inputs_base = [ |
|
|
s_trial_index, s_run_no, s_user_logs, s_current_trial_data, s_user_session_id, |
|
|
s_current_run_image_list, s_num_trials_this_run |
|
|
] |
|
|
event_outputs = [ |
|
|
s_trial_index, s_run_no, s_user_logs, s_current_trial_data, s_user_session_id, |
|
|
s_current_run_image_list, s_num_trials_this_run, *outputs_ui_components_definition |
|
|
] |
|
|
|
|
|
btn_agree_and_start.click(fn=handle_agree_and_start, inputs=[user_name_input, user_gender_input, user_age_input, user_education_input], outputs=[s_user_session_id, s_show_experiment_ui, welcome_container, experiment_container, welcome_error_msg]) |
|
|
btn_start.click(fn=partial(process_experiment_step, action_type="start_experiment"), inputs=click_inputs_base, outputs=event_outputs, queue=True) |
|
|
btn_left.click(fn=partial(process_experiment_step, action_type="record_choice", choice_value="left"), inputs=click_inputs_base, outputs=event_outputs, queue=True) |
|
|
btn_right.click(fn=partial(process_experiment_step, action_type="record_choice", choice_value="right"), inputs=click_inputs_base, outputs=event_outputs, queue=True) |
|
|
btn_download_json.click(fn=handle_download_history_file, inputs=None, outputs=[json_download_output, status_text]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
if not master_image_list: print("\n关键错误:程序无法启动,因无目标图片。"); exit() |
|
|
else: |
|
|
print(f"从 '{TARGET_DIR}' 加载 {len(master_image_list)} 张目标图片。") |
|
|
if not METHOD_ROOTS: print(f"警告: '{BASE_IMAGE_DIR}' 无候选方法子目录。") |
|
|
if not SUBJECTS: print("警告: SUBJECTS 列表为空。") |
|
|
print(f"用户选择日志保存到 Dataset: '{DATASET_REPO_ID}' 的 '{BATCH_LOG_FOLDER}/ 文件夹") |
|
|
if not os.getenv("HF_TOKEN"): print("警告: HF_TOKEN 未设置。日志无法保存到Hugging Face Dataset。\n 请在 Space Secrets 中设置 HF_TOKEN。") |
|
|
else: print("HF_TOKEN 已找到。") |
|
|
print(f"全局图片对历史将从 '{GLOBAL_HISTORY_FILE}' 加载/保存到此文件。") |
|
|
|
|
|
allowed_paths_list = [] |
|
|
image_base_dir_to_allow = BASE_IMAGE_DIR |
|
|
if os.path.exists(image_base_dir_to_allow) and os.path.isdir(image_base_dir_to_allow): |
|
|
allowed_paths_list.append(os.path.abspath(image_base_dir_to_allow)) |
|
|
else: |
|
|
print(f"关键警告:图片基础目录 '{image_base_dir_to_allow}' 不存在或非目录。") |
|
|
|
|
|
if os.path.exists(PERSISTENT_STORAGE_BASE) and os.path.isdir(PERSISTENT_STORAGE_BASE): |
|
|
allowed_paths_list.append(os.path.abspath(PERSISTENT_STORAGE_BASE)) |
|
|
else: |
|
|
print(f"警告:持久化存储基础目录 '{PERSISTENT_STORAGE_BASE}' 不存在。JSON历史文件下载可能受影响。") |
|
|
try: |
|
|
os.makedirs(PERSISTENT_STORAGE_BASE, exist_ok=True) |
|
|
print(f"信息:已尝试创建目录 '{PERSISTENT_STORAGE_BASE}'。") |
|
|
if os.path.exists(PERSISTENT_STORAGE_BASE) and os.path.isdir(PERSISTENT_STORAGE_BASE): |
|
|
allowed_paths_list.append(os.path.abspath(PERSISTENT_STORAGE_BASE)) |
|
|
except Exception as e_mkdir_main: |
|
|
print(f"错误:在 main 中创建目录 '{PERSISTENT_STORAGE_BASE}' 失败: {e_mkdir_main}") |
|
|
|
|
|
final_allowed_paths = list(set(allowed_paths_list)) |
|
|
if final_allowed_paths: |
|
|
print(f"Gradio demo.launch() 配置最终 allowed_paths: {final_allowed_paths}") |
|
|
else: |
|
|
print("警告:没有有效的 allowed_paths 被配置。Gradio文件访问可能受限。") |
|
|
|
|
|
print("启动 Gradio 应用...") |
|
|
if final_allowed_paths: |
|
|
demo.launch(allowed_paths=final_allowed_paths) |
|
|
else: |
|
|
demo.launch() |