| """ |
| Preprocessing utilities for ECFlow web app. |
| |
| Handles: |
| - CSV/NPZ parsing for both CV and TPD data |
| - Physical-to-dimensionless unit conversion for CV (Compton convention) |
| - Formal potential estimation |
| - Diffusion coefficient estimation via Randles-Sevcik |
| """ |
|
|
| import io |
| import numpy as np |
|
|
| |
| F_CONST = 96485.3329 |
| R_CONST = 8.314462 |
|
|
|
|
| |
| |
| |
|
|
| def nondimensionalize_cv(E_volts, i_amps, v_Vs, E0_V, T_K=298.15, |
| A_cm2=0.0707, C_A_molcm3=1e-6, D_A_cm2s=1e-5, n=1, |
| v_ref_Vs=0.1): |
| """ |
| Convert physical CV data to dimensionless units for the ECFlow model. |
| |
| Potential and current are nondimensionalized using the Compton convention |
| with the scan-rate-dependent diffusion length d = sqrt(D·RT/(nFv)): |
| θ = (E - E₀) / (RT/nF) |
| ψ = i / (nFAC·D/d) |
| |
| The dimensionless scan rate σ = v / v_ref is computed separately. |
| In the Compton convention σ ≡ 1 by construction (d absorbs v), but the |
| ECFlow model uses σ as an explicit conditioning variable to distinguish |
| experiments at different scan rates. Setting v_ref so that σ spans the |
| training range (~0.1–100) gives the model the scan-rate information. |
| |
| Args: |
| E_volts: potential array (V) |
| i_amps: current array (A) |
| v_Vs: scan rate (V/s) |
| E0_V: formal potential (V) |
| T_K: temperature (K) |
| A_cm2: electrode area (cm²) |
| C_A_molcm3: bulk concentration (mol/cm³) |
| D_A_cm2s: diffusion coefficient (cm²/s) |
| n: number of electrons |
| v_ref_Vs: reference scan rate (V/s) at which σ = 1 |
| |
| Returns: |
| theta: dimensionless potential array |
| flux: dimensionless current array |
| sigma: dimensionless scan rate (= v_Vs / v_ref_Vs) |
| """ |
| thermal_voltage = R_CONST * T_K / (n * F_CONST) |
|
|
| theta = (E_volts - E0_V) / thermal_voltage |
|
|
| d = np.sqrt(D_A_cm2s * R_CONST * T_K / (n * F_CONST * v_Vs)) |
| flux_scale = n * F_CONST * A_cm2 * C_A_molcm3 * D_A_cm2s / d |
| flux = i_amps / flux_scale |
|
|
| sigma = v_Vs / v_ref_Vs |
|
|
| return theta.astype(np.float32), flux.astype(np.float32), float(sigma) |
|
|
|
|
| def estimate_E0(E, i): |
| """ |
| Estimate formal potential from CV midpoint of anodic/cathodic peaks. |
| |
| Args: |
| E: potential array (V) |
| i: current array (A) |
| |
| Returns: |
| E0 estimate (V) |
| """ |
| E = np.asarray(E) |
| i = np.asarray(i) |
|
|
| mid = len(E) // 2 |
| i_anodic = i[:mid] if i[:mid].max() > abs(i[:mid].min()) else i[mid:] |
| i_cathodic = i[mid:] if i[mid:].min() < -abs(i[mid:].max()) else i[:mid] |
|
|
| E_pa = E[np.argmax(i)] |
| E_pc = E[np.argmin(i)] |
|
|
| return float((E_pa + E_pc) / 2.0) |
|
|
|
|
| def estimate_D_randles_sevcik(i_peak_A, v_Vs, A_cm2, C_molcm3, n=1, T_K=298.15): |
| """ |
| Estimate diffusion coefficient from Randles-Sevcik equation. |
| |
| i_p = 0.4463 * n^(3/2) * F^(3/2) * A * C * sqrt(D * v / (R * T)) |
| |
| Args: |
| i_peak_A: peak current (A) |
| v_Vs: scan rate (V/s) |
| A_cm2: electrode area (cm^2) |
| C_molcm3: concentration (mol/cm^3) |
| n: number of electrons |
| T_K: temperature (K) |
| |
| Returns: |
| D estimate (cm^2/s) |
| """ |
| coeff = 0.4463 * n**1.5 * F_CONST**1.5 * A_cm2 * C_molcm3 |
| if abs(coeff) < 1e-30 or v_Vs <= 0: |
| return 1e-5 |
| ratio = abs(i_peak_A) / coeff |
| D = ratio**2 * R_CONST * T_K / v_Vs |
| return max(float(D), 1e-10) |
|
|
|
|
| |
| |
| |
|
|
| def parse_cv_csv(file_content, delimiter=None): |
| """ |
| Parse a CV CSV file with flexible column detection. |
| |
| Expected columns: potential (V or mV) and current (A, mA, uA, nA). |
| Optionally includes a time column (s) to infer the scan rate. |
| Auto-detects column names and units from header. |
| |
| Args: |
| file_content: string or bytes of CSV content |
| delimiter: CSV delimiter (auto-detected if None) |
| |
| Returns: |
| dict with 'E_V' (potential in V), 'i_A' (current in A), |
| and optionally 'scan_rate_Vs' (V/s) if time is available. |
| """ |
| if isinstance(file_content, bytes): |
| file_content = file_content.decode("utf-8", errors="replace") |
|
|
| lines = file_content.strip().split("\n") |
| if len(lines) < 2: |
| raise ValueError("CSV must have at least a header and one data row") |
|
|
| if delimiter is None: |
| for d in [",", "\t", ";"]: |
| if d in lines[0]: |
| delimiter = d |
| break |
| if delimiter is None: |
| delimiter = "," |
|
|
| header = [h.strip().lower() for h in lines[0].split(delimiter)] |
|
|
| e_col, i_col, t_col = None, None, None |
| e_scale, i_scale = 1.0, 1.0 |
|
|
| time_patterns = ["time/s", "time (s)", "time/ms", "time (ms)", |
| "elapsed time", "t/s", "t (s)", "time"] |
|
|
| potential_patterns = [ |
| ("e/v", 1.0), ("e (v)", 1.0), ("potential/v", 1.0), ("potential (v)", 1.0), |
| ("ewe/v", 1.0), ("working electrode", 1.0), |
| ("e/mv", 1e-3), ("e (mv)", 1e-3), ("potential/mv", 1e-3), ("potential (mv)", 1e-3), |
| ("voltage", 1.0), ("e", 1.0), ("potential", 1.0), |
| ] |
| current_patterns = [ |
| ("i/a", 1.0), ("i (a)", 1.0), ("current/a", 1.0), ("current (a)", 1.0), |
| ("<i>/ma", 1e-3), |
| ("i/ma", 1e-3), ("i (ma)", 1e-3), ("current/ma", 1e-3), ("current (ma)", 1e-3), |
| ("i/ua", 1e-6), ("i (ua)", 1e-6), ("i/µa", 1e-6), ("i (µa)", 1e-6), |
| ("current/ua", 1e-6), ("current/µa", 1e-6), |
| ("i/na", 1e-9), ("i (na)", 1e-9), |
| ("current", 1.0), ("i", 1.0), |
| ] |
|
|
| for idx, col in enumerate(header): |
| if t_col is None: |
| for pat in time_patterns: |
| if pat in col: |
| t_col = idx |
| break |
| if t_col == idx: |
| continue |
| if e_col is None: |
| for pat, scale in potential_patterns: |
| if pat in col: |
| e_col, e_scale = idx, scale |
| break |
| if i_col is None: |
| for pat, scale in current_patterns: |
| if pat in col: |
| i_col, i_scale = idx, scale |
| break |
|
|
| if e_col is None or i_col is None: |
| non_time = [idx for idx in range(len(header)) if idx != t_col] |
| if len(non_time) >= 2: |
| e_col, i_col = non_time[0], non_time[1] |
| else: |
| raise ValueError( |
| f"Cannot identify potential/current columns from header: {header}" |
| ) |
|
|
| all_cols = {e_col, i_col} |
| if t_col is not None: |
| all_cols.add(t_col) |
| max_col = max(all_cols) |
|
|
| E_vals, i_vals, t_vals = [], [], [] |
| for line in lines[1:]: |
| parts = line.strip().split(delimiter) |
| if len(parts) <= max_col: |
| continue |
| try: |
| E_vals.append(float(parts[e_col]) * e_scale) |
| i_vals.append(float(parts[i_col]) * i_scale) |
| if t_col is not None: |
| t_vals.append(float(parts[t_col])) |
| except ValueError: |
| continue |
|
|
| if len(E_vals) < 5: |
| raise ValueError(f"Only {len(E_vals)} valid data points found") |
|
|
| result = { |
| "E_V": np.array(E_vals, dtype=np.float32), |
| "i_A": np.array(i_vals, dtype=np.float32), |
| } |
|
|
| if t_vals: |
| t_arr = np.array(t_vals, dtype=np.float64) |
| E_arr = np.array(E_vals, dtype=np.float64) |
| mid = len(E_arr) // 2 |
| dE = np.abs(np.diff(E_arr[:mid])) |
| dt = np.abs(np.diff(t_arr[:mid])) |
| valid = dt > 1e-12 |
| if valid.sum() > 10: |
| v = float(np.median(dE[valid] / dt[valid])) |
| if v > 1e-6: |
| result["scan_rate_Vs"] = v |
|
|
| return result |
|
|
|
|
| def parse_tpd_csv(file_content, delimiter=None): |
| """ |
| Parse a TPD CSV file. |
| |
| Expected columns: temperature (K or °C) and signal (arb. units). |
| Optionally includes a time column (s) to infer the heating rate. |
| Auto-detects Celsius vs Kelvin. |
| |
| Returns: |
| dict with 'T_K' (temperature in K), 'signal' (arb. units), |
| and optionally 'beta_Ks' (heating rate in K/s) if time is available. |
| """ |
| if isinstance(file_content, bytes): |
| file_content = file_content.decode("utf-8", errors="replace") |
|
|
| lines = file_content.strip().split("\n") |
| if len(lines) < 2: |
| raise ValueError("CSV must have at least a header and one data row") |
|
|
| if delimiter is None: |
| for d in [",", "\t", ";"]: |
| if d in lines[0]: |
| delimiter = d |
| break |
| if delimiter is None: |
| delimiter = "," |
|
|
| header = [h.strip().lower() for h in lines[0].split(delimiter)] |
|
|
| t_col, s_col, time_col = None, None, None |
| is_celsius = False |
|
|
| temp_patterns = [ |
| ("temperature", False), ("temp", False), ("t/k", False), ("t (k)", False), |
| ("t/c", True), ("t (c)", True), ("t/°c", True), ("t (°c)", True), |
| ] |
| signal_patterns = ["signal", "rate", "intensity", "des", "tpd"] |
| time_patterns = ["time/s", "time (s)", "time"] |
|
|
| for idx, col in enumerate(header): |
| if t_col is None: |
| for pat, celsius in temp_patterns: |
| if pat in col: |
| t_col = idx |
| is_celsius = celsius |
| break |
| if s_col is None: |
| for pat in signal_patterns: |
| if pat in col: |
| s_col = idx |
| break |
| if time_col is None: |
| for pat in time_patterns: |
| if pat in col: |
| time_col = idx |
| break |
|
|
| if t_col is None or s_col is None: |
| if len(header) >= 2: |
| t_col, s_col = 0, 1 |
| else: |
| raise ValueError( |
| f"Cannot identify temperature/signal columns from header: {header}" |
| ) |
|
|
| all_cols = {t_col, s_col} |
| if time_col is not None: |
| all_cols.add(time_col) |
| max_col = max(all_cols) |
|
|
| T_vals, s_vals, time_vals = [], [], [] |
| for line in lines[1:]: |
| parts = line.strip().split(delimiter) |
| if len(parts) <= max_col: |
| continue |
| try: |
| T_vals.append(float(parts[t_col])) |
| s_vals.append(float(parts[s_col])) |
| if time_col is not None: |
| time_vals.append(float(parts[time_col])) |
| except ValueError: |
| continue |
|
|
| if len(T_vals) < 5: |
| raise ValueError(f"Only {len(T_vals)} valid data points found") |
|
|
| T_arr = np.array(T_vals, dtype=np.float32) |
| if is_celsius or T_arr.max() < 200: |
| T_arr += 273.15 |
|
|
| result = { |
| "T_K": T_arr, |
| "signal": np.array(s_vals, dtype=np.float32), |
| } |
|
|
| if time_vals: |
| time_arr = np.array(time_vals, dtype=np.float32) |
| dt = time_arr[-1] - time_arr[0] |
| dT = T_arr[-1] - T_arr[0] |
| if dt > 0: |
| result["beta_Ks"] = float(dT / dt) |
|
|
| return result |
|
|
|
|
| def parse_dimensionless_cv_csv(file_content, delimiter=None): |
| """ |
| Parse a CSV that already contains dimensionless CV data. |
| |
| Expected columns: theta (dimensionless potential), flux (dimensionless current). |
| |
| Returns: |
| dict with 'theta', 'flux' arrays |
| """ |
| if isinstance(file_content, bytes): |
| file_content = file_content.decode("utf-8", errors="replace") |
|
|
| lines = file_content.strip().split("\n") |
| if len(lines) < 2: |
| raise ValueError("CSV must have at least a header and one data row") |
|
|
| if delimiter is None: |
| for d in [",", "\t", ";"]: |
| if d in lines[0]: |
| delimiter = d |
| break |
| if delimiter is None: |
| delimiter = "," |
|
|
| header = [h.strip().lower() for h in lines[0].split(delimiter)] |
|
|
| t_col, f_col = None, None |
| for idx, col in enumerate(header): |
| if t_col is None and any(p in col for p in ["theta", "potential", "e"]): |
| t_col = idx |
| if f_col is None and any(p in col for p in ["flux", "current", "j", "i"]): |
| f_col = idx |
|
|
| if t_col is None or f_col is None: |
| if len(header) >= 2: |
| t_col, f_col = 0, 1 |
| else: |
| raise ValueError(f"Cannot identify columns from header: {header}") |
|
|
| theta_vals, flux_vals = [], [] |
| for line in lines[1:]: |
| parts = line.strip().split(delimiter) |
| if len(parts) <= max(t_col, f_col): |
| continue |
| try: |
| theta_vals.append(float(parts[t_col])) |
| flux_vals.append(float(parts[f_col])) |
| except ValueError: |
| continue |
|
|
| return { |
| "theta": np.array(theta_vals, dtype=np.float32), |
| "flux": np.array(flux_vals, dtype=np.float32), |
| } |
|
|
|
|
| |
|
|
| MAX_HEATING_RATES = 3 |
| TPD_FEATURES_PER_RATE = 6 |
| TPD_SUMMARY_DIM = MAX_HEATING_RATES * TPD_FEATURES_PER_RATE + MAX_HEATING_RATES |
|
|
|
|
| def extract_tpd_summary_stats(temperature, rate, lengths, heating_rates, n_rates): |
| """Extract 21-dim hand-crafted summary statistics from raw TPD data. |
| |
| Per heating rate (6 features): normalized peak rate, peak temperature, |
| half-peak width, normalized total desorption integral, asymmetry ratio |
| (left vs right half-width), log10(peak rate). |
| Plus log10(heating_rate) per curve. |
| |
| Args: |
| temperature: [N, T] array of temperatures (K) |
| rate: [N, T] array of desorption rates |
| lengths: [N] array of valid lengths per curve |
| heating_rates: [N] array of heating rates (K/s) |
| n_rates: number of heating rates |
| |
| Returns: |
| 1-D array of shape (21,) |
| """ |
| features = np.zeros(TPD_SUMMARY_DIM, dtype=np.float32) |
| for i in range(min(n_rates, MAX_HEATING_RATES)): |
| L = int(lengths[i]) |
| temp = temperature[i, :L] |
| r = rate[i, :L] |
| peak_abs = np.max(np.abs(r)) + 1e-30 |
|
|
| peak_rate = np.max(r) |
| idx_peak = np.argmax(r) |
| peak_temp = temp[idx_peak] |
|
|
| half_max = peak_rate / 2.0 |
| above_half = r >= half_max |
| if np.any(above_half): |
| indices = np.where(above_half)[0] |
| half_width = temp[indices[-1]] - temp[indices[0]] |
| left_width = peak_temp - temp[indices[0]] |
| right_width = temp[indices[-1]] - peak_temp |
| asymmetry = (right_width - left_width) / (half_width + 1e-30) |
| else: |
| half_width = 0.0 |
| asymmetry = 0.0 |
|
|
| if L > 1: |
| integral = (np.trapezoid(r, temp) |
| if hasattr(np, 'trapezoid') else np.trapz(r, temp)) |
| else: |
| integral = 0.0 |
|
|
| log_peak = np.log10(peak_abs) |
|
|
| offset = i * TPD_FEATURES_PER_RATE |
| features[offset + 0] = peak_rate / peak_abs |
| features[offset + 1] = peak_temp |
| features[offset + 2] = half_width |
| features[offset + 3] = integral / (peak_abs * (temp.max() - temp.min()) + 1e-30) |
| features[offset + 4] = asymmetry |
| features[offset + 5] = log_peak |
|
|
| features[MAX_HEATING_RATES * TPD_FEATURES_PER_RATE + i] = np.log10(heating_rates[i]) |
| return features |
|
|