""" Preprocessing utilities for ECFlow web app. Handles: - CSV/NPZ parsing for both CV and TPD data - Physical-to-dimensionless unit conversion for CV (Compton convention) - Formal potential estimation - Diffusion coefficient estimation via Randles-Sevcik """ import io import numpy as np # Physical constants F_CONST = 96485.3329 # Faraday constant (C/mol) R_CONST = 8.314462 # Gas constant (J/(mol·K)) # ========================================================================= # CV nondimensionalization # ========================================================================= def nondimensionalize_cv(E_volts, i_amps, v_Vs, E0_V, T_K=298.15, A_cm2=0.0707, C_A_molcm3=1e-6, D_A_cm2s=1e-5, n=1, v_ref_Vs=0.1): """ Convert physical CV data to dimensionless units for the ECFlow model. Potential and current are nondimensionalized using the Compton convention with the scan-rate-dependent diffusion length d = sqrt(D·RT/(nFv)): θ = (E - E₀) / (RT/nF) ψ = i / (nFAC·D/d) The dimensionless scan rate σ = v / v_ref is computed separately. In the Compton convention σ ≡ 1 by construction (d absorbs v), but the ECFlow model uses σ as an explicit conditioning variable to distinguish experiments at different scan rates. Setting v_ref so that σ spans the training range (~0.1–100) gives the model the scan-rate information. Args: E_volts: potential array (V) i_amps: current array (A) v_Vs: scan rate (V/s) E0_V: formal potential (V) T_K: temperature (K) A_cm2: electrode area (cm²) C_A_molcm3: bulk concentration (mol/cm³) D_A_cm2s: diffusion coefficient (cm²/s) n: number of electrons v_ref_Vs: reference scan rate (V/s) at which σ = 1 Returns: theta: dimensionless potential array flux: dimensionless current array sigma: dimensionless scan rate (= v_Vs / v_ref_Vs) """ thermal_voltage = R_CONST * T_K / (n * F_CONST) theta = (E_volts - E0_V) / thermal_voltage d = np.sqrt(D_A_cm2s * R_CONST * T_K / (n * F_CONST * v_Vs)) flux_scale = n * F_CONST * A_cm2 * C_A_molcm3 * D_A_cm2s / d flux = i_amps / flux_scale sigma = v_Vs / v_ref_Vs return theta.astype(np.float32), flux.astype(np.float32), float(sigma) def estimate_E0(E, i): """ Estimate formal potential from CV midpoint of anodic/cathodic peaks. Args: E: potential array (V) i: current array (A) Returns: E0 estimate (V) """ E = np.asarray(E) i = np.asarray(i) mid = len(E) // 2 i_anodic = i[:mid] if i[:mid].max() > abs(i[:mid].min()) else i[mid:] i_cathodic = i[mid:] if i[mid:].min() < -abs(i[mid:].max()) else i[:mid] E_pa = E[np.argmax(i)] E_pc = E[np.argmin(i)] return float((E_pa + E_pc) / 2.0) def estimate_D_randles_sevcik(i_peak_A, v_Vs, A_cm2, C_molcm3, n=1, T_K=298.15): """ Estimate diffusion coefficient from Randles-Sevcik equation. i_p = 0.4463 * n^(3/2) * F^(3/2) * A * C * sqrt(D * v / (R * T)) Args: i_peak_A: peak current (A) v_Vs: scan rate (V/s) A_cm2: electrode area (cm^2) C_molcm3: concentration (mol/cm^3) n: number of electrons T_K: temperature (K) Returns: D estimate (cm^2/s) """ coeff = 0.4463 * n**1.5 * F_CONST**1.5 * A_cm2 * C_molcm3 if abs(coeff) < 1e-30 or v_Vs <= 0: return 1e-5 ratio = abs(i_peak_A) / coeff D = ratio**2 * R_CONST * T_K / v_Vs return max(float(D), 1e-10) # ========================================================================= # CSV parsing # ========================================================================= def parse_cv_csv(file_content, delimiter=None): """ Parse a CV CSV file with flexible column detection. Expected columns: potential (V or mV) and current (A, mA, uA, nA). Optionally includes a time column (s) to infer the scan rate. Auto-detects column names and units from header. Args: file_content: string or bytes of CSV content delimiter: CSV delimiter (auto-detected if None) Returns: dict with 'E_V' (potential in V), 'i_A' (current in A), and optionally 'scan_rate_Vs' (V/s) if time is available. """ if isinstance(file_content, bytes): file_content = file_content.decode("utf-8", errors="replace") lines = file_content.strip().split("\n") if len(lines) < 2: raise ValueError("CSV must have at least a header and one data row") if delimiter is None: for d in [",", "\t", ";"]: if d in lines[0]: delimiter = d break if delimiter is None: delimiter = "," header = [h.strip().lower() for h in lines[0].split(delimiter)] e_col, i_col, t_col = None, None, None e_scale, i_scale = 1.0, 1.0 time_patterns = ["time/s", "time (s)", "time/ms", "time (ms)", "elapsed time", "t/s", "t (s)", "time"] potential_patterns = [ ("e/v", 1.0), ("e (v)", 1.0), ("potential/v", 1.0), ("potential (v)", 1.0), ("ewe/v", 1.0), ("working electrode", 1.0), ("e/mv", 1e-3), ("e (mv)", 1e-3), ("potential/mv", 1e-3), ("potential (mv)", 1e-3), ("voltage", 1.0), ("e", 1.0), ("potential", 1.0), ] current_patterns = [ ("i/a", 1.0), ("i (a)", 1.0), ("current/a", 1.0), ("current (a)", 1.0), ("/ma", 1e-3), ("i/ma", 1e-3), ("i (ma)", 1e-3), ("current/ma", 1e-3), ("current (ma)", 1e-3), ("i/ua", 1e-6), ("i (ua)", 1e-6), ("i/µa", 1e-6), ("i (µa)", 1e-6), ("current/ua", 1e-6), ("current/µa", 1e-6), ("i/na", 1e-9), ("i (na)", 1e-9), ("current", 1.0), ("i", 1.0), ] for idx, col in enumerate(header): if t_col is None: for pat in time_patterns: if pat in col: t_col = idx break if t_col == idx: continue if e_col is None: for pat, scale in potential_patterns: if pat in col: e_col, e_scale = idx, scale break if i_col is None: for pat, scale in current_patterns: if pat in col: i_col, i_scale = idx, scale break if e_col is None or i_col is None: non_time = [idx for idx in range(len(header)) if idx != t_col] if len(non_time) >= 2: e_col, i_col = non_time[0], non_time[1] else: raise ValueError( f"Cannot identify potential/current columns from header: {header}" ) all_cols = {e_col, i_col} if t_col is not None: all_cols.add(t_col) max_col = max(all_cols) E_vals, i_vals, t_vals = [], [], [] for line in lines[1:]: parts = line.strip().split(delimiter) if len(parts) <= max_col: continue try: E_vals.append(float(parts[e_col]) * e_scale) i_vals.append(float(parts[i_col]) * i_scale) if t_col is not None: t_vals.append(float(parts[t_col])) except ValueError: continue if len(E_vals) < 5: raise ValueError(f"Only {len(E_vals)} valid data points found") result = { "E_V": np.array(E_vals, dtype=np.float32), "i_A": np.array(i_vals, dtype=np.float32), } if t_vals: t_arr = np.array(t_vals, dtype=np.float64) E_arr = np.array(E_vals, dtype=np.float64) mid = len(E_arr) // 2 dE = np.abs(np.diff(E_arr[:mid])) dt = np.abs(np.diff(t_arr[:mid])) valid = dt > 1e-12 if valid.sum() > 10: v = float(np.median(dE[valid] / dt[valid])) if v > 1e-6: result["scan_rate_Vs"] = v return result def parse_tpd_csv(file_content, delimiter=None): """ Parse a TPD CSV file. Expected columns: temperature (K or °C) and signal (arb. units). Optionally includes a time column (s) to infer the heating rate. Auto-detects Celsius vs Kelvin. Returns: dict with 'T_K' (temperature in K), 'signal' (arb. units), and optionally 'beta_Ks' (heating rate in K/s) if time is available. """ if isinstance(file_content, bytes): file_content = file_content.decode("utf-8", errors="replace") lines = file_content.strip().split("\n") if len(lines) < 2: raise ValueError("CSV must have at least a header and one data row") if delimiter is None: for d in [",", "\t", ";"]: if d in lines[0]: delimiter = d break if delimiter is None: delimiter = "," header = [h.strip().lower() for h in lines[0].split(delimiter)] t_col, s_col, time_col = None, None, None is_celsius = False temp_patterns = [ ("temperature", False), ("temp", False), ("t/k", False), ("t (k)", False), ("t/c", True), ("t (c)", True), ("t/°c", True), ("t (°c)", True), ] signal_patterns = ["signal", "rate", "intensity", "des", "tpd"] time_patterns = ["time/s", "time (s)", "time"] for idx, col in enumerate(header): if t_col is None: for pat, celsius in temp_patterns: if pat in col: t_col = idx is_celsius = celsius break if s_col is None: for pat in signal_patterns: if pat in col: s_col = idx break if time_col is None: for pat in time_patterns: if pat in col: time_col = idx break if t_col is None or s_col is None: if len(header) >= 2: t_col, s_col = 0, 1 else: raise ValueError( f"Cannot identify temperature/signal columns from header: {header}" ) all_cols = {t_col, s_col} if time_col is not None: all_cols.add(time_col) max_col = max(all_cols) T_vals, s_vals, time_vals = [], [], [] for line in lines[1:]: parts = line.strip().split(delimiter) if len(parts) <= max_col: continue try: T_vals.append(float(parts[t_col])) s_vals.append(float(parts[s_col])) if time_col is not None: time_vals.append(float(parts[time_col])) except ValueError: continue if len(T_vals) < 5: raise ValueError(f"Only {len(T_vals)} valid data points found") T_arr = np.array(T_vals, dtype=np.float32) if is_celsius or T_arr.max() < 200: T_arr += 273.15 result = { "T_K": T_arr, "signal": np.array(s_vals, dtype=np.float32), } if time_vals: time_arr = np.array(time_vals, dtype=np.float32) dt = time_arr[-1] - time_arr[0] dT = T_arr[-1] - T_arr[0] if dt > 0: result["beta_Ks"] = float(dT / dt) return result def parse_dimensionless_cv_csv(file_content, delimiter=None): """ Parse a CSV that already contains dimensionless CV data. Expected columns: theta (dimensionless potential), flux (dimensionless current). Returns: dict with 'theta', 'flux' arrays """ if isinstance(file_content, bytes): file_content = file_content.decode("utf-8", errors="replace") lines = file_content.strip().split("\n") if len(lines) < 2: raise ValueError("CSV must have at least a header and one data row") if delimiter is None: for d in [",", "\t", ";"]: if d in lines[0]: delimiter = d break if delimiter is None: delimiter = "," header = [h.strip().lower() for h in lines[0].split(delimiter)] t_col, f_col = None, None for idx, col in enumerate(header): if t_col is None and any(p in col for p in ["theta", "potential", "e"]): t_col = idx if f_col is None and any(p in col for p in ["flux", "current", "j", "i"]): f_col = idx if t_col is None or f_col is None: if len(header) >= 2: t_col, f_col = 0, 1 else: raise ValueError(f"Cannot identify columns from header: {header}") theta_vals, flux_vals = [], [] for line in lines[1:]: parts = line.strip().split(delimiter) if len(parts) <= max(t_col, f_col): continue try: theta_vals.append(float(parts[t_col])) flux_vals.append(float(parts[f_col])) except ValueError: continue return { "theta": np.array(theta_vals, dtype=np.float32), "flux": np.array(flux_vals, dtype=np.float32), } # ── TPD summary feature extraction ────────────────────────────────── MAX_HEATING_RATES = 3 TPD_FEATURES_PER_RATE = 6 TPD_SUMMARY_DIM = MAX_HEATING_RATES * TPD_FEATURES_PER_RATE + MAX_HEATING_RATES # 21 def extract_tpd_summary_stats(temperature, rate, lengths, heating_rates, n_rates): """Extract 21-dim hand-crafted summary statistics from raw TPD data. Per heating rate (6 features): normalized peak rate, peak temperature, half-peak width, normalized total desorption integral, asymmetry ratio (left vs right half-width), log10(peak rate). Plus log10(heating_rate) per curve. Args: temperature: [N, T] array of temperatures (K) rate: [N, T] array of desorption rates lengths: [N] array of valid lengths per curve heating_rates: [N] array of heating rates (K/s) n_rates: number of heating rates Returns: 1-D array of shape (21,) """ features = np.zeros(TPD_SUMMARY_DIM, dtype=np.float32) for i in range(min(n_rates, MAX_HEATING_RATES)): L = int(lengths[i]) temp = temperature[i, :L] r = rate[i, :L] peak_abs = np.max(np.abs(r)) + 1e-30 peak_rate = np.max(r) idx_peak = np.argmax(r) peak_temp = temp[idx_peak] half_max = peak_rate / 2.0 above_half = r >= half_max if np.any(above_half): indices = np.where(above_half)[0] half_width = temp[indices[-1]] - temp[indices[0]] left_width = peak_temp - temp[indices[0]] right_width = temp[indices[-1]] - peak_temp asymmetry = (right_width - left_width) / (half_width + 1e-30) else: half_width = 0.0 asymmetry = 0.0 if L > 1: integral = (np.trapezoid(r, temp) if hasattr(np, 'trapezoid') else np.trapz(r, temp)) else: integral = 0.0 log_peak = np.log10(peak_abs) offset = i * TPD_FEATURES_PER_RATE features[offset + 0] = peak_rate / peak_abs features[offset + 1] = peak_temp features[offset + 2] = half_width features[offset + 3] = integral / (peak_abs * (temp.max() - temp.min()) + 1e-30) features[offset + 4] = asymmetry features[offset + 5] = log_peak features[MAX_HEATING_RATES * TPD_FEATURES_PER_RATE + i] = np.log10(heating_rates[i]) return features