ecflow / preprocessing.py
Bing Yan
Switch TPD to Summary-ECFlow model (21-dim hand-crafted features)
ad63797
"""
Preprocessing utilities for ECFlow web app.
Handles:
- CSV/NPZ parsing for both CV and TPD data
- Physical-to-dimensionless unit conversion for CV (Compton convention)
- Formal potential estimation
- Diffusion coefficient estimation via Randles-Sevcik
"""
import io
import numpy as np
# Physical constants
F_CONST = 96485.3329 # Faraday constant (C/mol)
R_CONST = 8.314462 # Gas constant (J/(mol·K))
# =========================================================================
# CV nondimensionalization
# =========================================================================
def nondimensionalize_cv(E_volts, i_amps, v_Vs, E0_V, T_K=298.15,
A_cm2=0.0707, C_A_molcm3=1e-6, D_A_cm2s=1e-5, n=1,
v_ref_Vs=0.1):
"""
Convert physical CV data to dimensionless units for the ECFlow model.
Potential and current are nondimensionalized using the Compton convention
with the scan-rate-dependent diffusion length d = sqrt(D·RT/(nFv)):
θ = (E - E₀) / (RT/nF)
ψ = i / (nFAC·D/d)
The dimensionless scan rate σ = v / v_ref is computed separately.
In the Compton convention σ ≡ 1 by construction (d absorbs v), but the
ECFlow model uses σ as an explicit conditioning variable to distinguish
experiments at different scan rates. Setting v_ref so that σ spans the
training range (~0.1–100) gives the model the scan-rate information.
Args:
E_volts: potential array (V)
i_amps: current array (A)
v_Vs: scan rate (V/s)
E0_V: formal potential (V)
T_K: temperature (K)
A_cm2: electrode area (cm²)
C_A_molcm3: bulk concentration (mol/cm³)
D_A_cm2s: diffusion coefficient (cm²/s)
n: number of electrons
v_ref_Vs: reference scan rate (V/s) at which σ = 1
Returns:
theta: dimensionless potential array
flux: dimensionless current array
sigma: dimensionless scan rate (= v_Vs / v_ref_Vs)
"""
thermal_voltage = R_CONST * T_K / (n * F_CONST)
theta = (E_volts - E0_V) / thermal_voltage
d = np.sqrt(D_A_cm2s * R_CONST * T_K / (n * F_CONST * v_Vs))
flux_scale = n * F_CONST * A_cm2 * C_A_molcm3 * D_A_cm2s / d
flux = i_amps / flux_scale
sigma = v_Vs / v_ref_Vs
return theta.astype(np.float32), flux.astype(np.float32), float(sigma)
def estimate_E0(E, i):
"""
Estimate formal potential from CV midpoint of anodic/cathodic peaks.
Args:
E: potential array (V)
i: current array (A)
Returns:
E0 estimate (V)
"""
E = np.asarray(E)
i = np.asarray(i)
mid = len(E) // 2
i_anodic = i[:mid] if i[:mid].max() > abs(i[:mid].min()) else i[mid:]
i_cathodic = i[mid:] if i[mid:].min() < -abs(i[mid:].max()) else i[:mid]
E_pa = E[np.argmax(i)]
E_pc = E[np.argmin(i)]
return float((E_pa + E_pc) / 2.0)
def estimate_D_randles_sevcik(i_peak_A, v_Vs, A_cm2, C_molcm3, n=1, T_K=298.15):
"""
Estimate diffusion coefficient from Randles-Sevcik equation.
i_p = 0.4463 * n^(3/2) * F^(3/2) * A * C * sqrt(D * v / (R * T))
Args:
i_peak_A: peak current (A)
v_Vs: scan rate (V/s)
A_cm2: electrode area (cm^2)
C_molcm3: concentration (mol/cm^3)
n: number of electrons
T_K: temperature (K)
Returns:
D estimate (cm^2/s)
"""
coeff = 0.4463 * n**1.5 * F_CONST**1.5 * A_cm2 * C_molcm3
if abs(coeff) < 1e-30 or v_Vs <= 0:
return 1e-5
ratio = abs(i_peak_A) / coeff
D = ratio**2 * R_CONST * T_K / v_Vs
return max(float(D), 1e-10)
# =========================================================================
# CSV parsing
# =========================================================================
def parse_cv_csv(file_content, delimiter=None):
"""
Parse a CV CSV file with flexible column detection.
Expected columns: potential (V or mV) and current (A, mA, uA, nA).
Optionally includes a time column (s) to infer the scan rate.
Auto-detects column names and units from header.
Args:
file_content: string or bytes of CSV content
delimiter: CSV delimiter (auto-detected if None)
Returns:
dict with 'E_V' (potential in V), 'i_A' (current in A),
and optionally 'scan_rate_Vs' (V/s) if time is available.
"""
if isinstance(file_content, bytes):
file_content = file_content.decode("utf-8", errors="replace")
lines = file_content.strip().split("\n")
if len(lines) < 2:
raise ValueError("CSV must have at least a header and one data row")
if delimiter is None:
for d in [",", "\t", ";"]:
if d in lines[0]:
delimiter = d
break
if delimiter is None:
delimiter = ","
header = [h.strip().lower() for h in lines[0].split(delimiter)]
e_col, i_col, t_col = None, None, None
e_scale, i_scale = 1.0, 1.0
time_patterns = ["time/s", "time (s)", "time/ms", "time (ms)",
"elapsed time", "t/s", "t (s)", "time"]
potential_patterns = [
("e/v", 1.0), ("e (v)", 1.0), ("potential/v", 1.0), ("potential (v)", 1.0),
("ewe/v", 1.0), ("working electrode", 1.0),
("e/mv", 1e-3), ("e (mv)", 1e-3), ("potential/mv", 1e-3), ("potential (mv)", 1e-3),
("voltage", 1.0), ("e", 1.0), ("potential", 1.0),
]
current_patterns = [
("i/a", 1.0), ("i (a)", 1.0), ("current/a", 1.0), ("current (a)", 1.0),
("<i>/ma", 1e-3),
("i/ma", 1e-3), ("i (ma)", 1e-3), ("current/ma", 1e-3), ("current (ma)", 1e-3),
("i/ua", 1e-6), ("i (ua)", 1e-6), ("i/µa", 1e-6), ("i (µa)", 1e-6),
("current/ua", 1e-6), ("current/µa", 1e-6),
("i/na", 1e-9), ("i (na)", 1e-9),
("current", 1.0), ("i", 1.0),
]
for idx, col in enumerate(header):
if t_col is None:
for pat in time_patterns:
if pat in col:
t_col = idx
break
if t_col == idx:
continue
if e_col is None:
for pat, scale in potential_patterns:
if pat in col:
e_col, e_scale = idx, scale
break
if i_col is None:
for pat, scale in current_patterns:
if pat in col:
i_col, i_scale = idx, scale
break
if e_col is None or i_col is None:
non_time = [idx for idx in range(len(header)) if idx != t_col]
if len(non_time) >= 2:
e_col, i_col = non_time[0], non_time[1]
else:
raise ValueError(
f"Cannot identify potential/current columns from header: {header}"
)
all_cols = {e_col, i_col}
if t_col is not None:
all_cols.add(t_col)
max_col = max(all_cols)
E_vals, i_vals, t_vals = [], [], []
for line in lines[1:]:
parts = line.strip().split(delimiter)
if len(parts) <= max_col:
continue
try:
E_vals.append(float(parts[e_col]) * e_scale)
i_vals.append(float(parts[i_col]) * i_scale)
if t_col is not None:
t_vals.append(float(parts[t_col]))
except ValueError:
continue
if len(E_vals) < 5:
raise ValueError(f"Only {len(E_vals)} valid data points found")
result = {
"E_V": np.array(E_vals, dtype=np.float32),
"i_A": np.array(i_vals, dtype=np.float32),
}
if t_vals:
t_arr = np.array(t_vals, dtype=np.float64)
E_arr = np.array(E_vals, dtype=np.float64)
mid = len(E_arr) // 2
dE = np.abs(np.diff(E_arr[:mid]))
dt = np.abs(np.diff(t_arr[:mid]))
valid = dt > 1e-12
if valid.sum() > 10:
v = float(np.median(dE[valid] / dt[valid]))
if v > 1e-6:
result["scan_rate_Vs"] = v
return result
def parse_tpd_csv(file_content, delimiter=None):
"""
Parse a TPD CSV file.
Expected columns: temperature (K or °C) and signal (arb. units).
Optionally includes a time column (s) to infer the heating rate.
Auto-detects Celsius vs Kelvin.
Returns:
dict with 'T_K' (temperature in K), 'signal' (arb. units),
and optionally 'beta_Ks' (heating rate in K/s) if time is available.
"""
if isinstance(file_content, bytes):
file_content = file_content.decode("utf-8", errors="replace")
lines = file_content.strip().split("\n")
if len(lines) < 2:
raise ValueError("CSV must have at least a header and one data row")
if delimiter is None:
for d in [",", "\t", ";"]:
if d in lines[0]:
delimiter = d
break
if delimiter is None:
delimiter = ","
header = [h.strip().lower() for h in lines[0].split(delimiter)]
t_col, s_col, time_col = None, None, None
is_celsius = False
temp_patterns = [
("temperature", False), ("temp", False), ("t/k", False), ("t (k)", False),
("t/c", True), ("t (c)", True), ("t/°c", True), ("t (°c)", True),
]
signal_patterns = ["signal", "rate", "intensity", "des", "tpd"]
time_patterns = ["time/s", "time (s)", "time"]
for idx, col in enumerate(header):
if t_col is None:
for pat, celsius in temp_patterns:
if pat in col:
t_col = idx
is_celsius = celsius
break
if s_col is None:
for pat in signal_patterns:
if pat in col:
s_col = idx
break
if time_col is None:
for pat in time_patterns:
if pat in col:
time_col = idx
break
if t_col is None or s_col is None:
if len(header) >= 2:
t_col, s_col = 0, 1
else:
raise ValueError(
f"Cannot identify temperature/signal columns from header: {header}"
)
all_cols = {t_col, s_col}
if time_col is not None:
all_cols.add(time_col)
max_col = max(all_cols)
T_vals, s_vals, time_vals = [], [], []
for line in lines[1:]:
parts = line.strip().split(delimiter)
if len(parts) <= max_col:
continue
try:
T_vals.append(float(parts[t_col]))
s_vals.append(float(parts[s_col]))
if time_col is not None:
time_vals.append(float(parts[time_col]))
except ValueError:
continue
if len(T_vals) < 5:
raise ValueError(f"Only {len(T_vals)} valid data points found")
T_arr = np.array(T_vals, dtype=np.float32)
if is_celsius or T_arr.max() < 200:
T_arr += 273.15
result = {
"T_K": T_arr,
"signal": np.array(s_vals, dtype=np.float32),
}
if time_vals:
time_arr = np.array(time_vals, dtype=np.float32)
dt = time_arr[-1] - time_arr[0]
dT = T_arr[-1] - T_arr[0]
if dt > 0:
result["beta_Ks"] = float(dT / dt)
return result
def parse_dimensionless_cv_csv(file_content, delimiter=None):
"""
Parse a CSV that already contains dimensionless CV data.
Expected columns: theta (dimensionless potential), flux (dimensionless current).
Returns:
dict with 'theta', 'flux' arrays
"""
if isinstance(file_content, bytes):
file_content = file_content.decode("utf-8", errors="replace")
lines = file_content.strip().split("\n")
if len(lines) < 2:
raise ValueError("CSV must have at least a header and one data row")
if delimiter is None:
for d in [",", "\t", ";"]:
if d in lines[0]:
delimiter = d
break
if delimiter is None:
delimiter = ","
header = [h.strip().lower() for h in lines[0].split(delimiter)]
t_col, f_col = None, None
for idx, col in enumerate(header):
if t_col is None and any(p in col for p in ["theta", "potential", "e"]):
t_col = idx
if f_col is None and any(p in col for p in ["flux", "current", "j", "i"]):
f_col = idx
if t_col is None or f_col is None:
if len(header) >= 2:
t_col, f_col = 0, 1
else:
raise ValueError(f"Cannot identify columns from header: {header}")
theta_vals, flux_vals = [], []
for line in lines[1:]:
parts = line.strip().split(delimiter)
if len(parts) <= max(t_col, f_col):
continue
try:
theta_vals.append(float(parts[t_col]))
flux_vals.append(float(parts[f_col]))
except ValueError:
continue
return {
"theta": np.array(theta_vals, dtype=np.float32),
"flux": np.array(flux_vals, dtype=np.float32),
}
# ── TPD summary feature extraction ──────────────────────────────────
MAX_HEATING_RATES = 3
TPD_FEATURES_PER_RATE = 6
TPD_SUMMARY_DIM = MAX_HEATING_RATES * TPD_FEATURES_PER_RATE + MAX_HEATING_RATES # 21
def extract_tpd_summary_stats(temperature, rate, lengths, heating_rates, n_rates):
"""Extract 21-dim hand-crafted summary statistics from raw TPD data.
Per heating rate (6 features): normalized peak rate, peak temperature,
half-peak width, normalized total desorption integral, asymmetry ratio
(left vs right half-width), log10(peak rate).
Plus log10(heating_rate) per curve.
Args:
temperature: [N, T] array of temperatures (K)
rate: [N, T] array of desorption rates
lengths: [N] array of valid lengths per curve
heating_rates: [N] array of heating rates (K/s)
n_rates: number of heating rates
Returns:
1-D array of shape (21,)
"""
features = np.zeros(TPD_SUMMARY_DIM, dtype=np.float32)
for i in range(min(n_rates, MAX_HEATING_RATES)):
L = int(lengths[i])
temp = temperature[i, :L]
r = rate[i, :L]
peak_abs = np.max(np.abs(r)) + 1e-30
peak_rate = np.max(r)
idx_peak = np.argmax(r)
peak_temp = temp[idx_peak]
half_max = peak_rate / 2.0
above_half = r >= half_max
if np.any(above_half):
indices = np.where(above_half)[0]
half_width = temp[indices[-1]] - temp[indices[0]]
left_width = peak_temp - temp[indices[0]]
right_width = temp[indices[-1]] - peak_temp
asymmetry = (right_width - left_width) / (half_width + 1e-30)
else:
half_width = 0.0
asymmetry = 0.0
if L > 1:
integral = (np.trapezoid(r, temp)
if hasattr(np, 'trapezoid') else np.trapz(r, temp))
else:
integral = 0.0
log_peak = np.log10(peak_abs)
offset = i * TPD_FEATURES_PER_RATE
features[offset + 0] = peak_rate / peak_abs
features[offset + 1] = peak_temp
features[offset + 2] = half_width
features[offset + 3] = integral / (peak_abs * (temp.max() - temp.min()) + 1e-30)
features[offset + 4] = asymmetry
features[offset + 5] = log_peak
features[MAX_HEATING_RATES * TPD_FEATURES_PER_RATE + i] = np.log10(heating_rates[i])
return features