diff --git a/tdoa.py b/tdoa.py new file mode 100644 index 0000000..d93d961 --- /dev/null +++ b/tdoa.py @@ -0,0 +1,117 @@ +import numpy as np +import time +from datetime import datetime + +FIFO_PATH = "/tmp/esp32_audio" +SAMPLE_RATE = 16000 +CHANNELS = 2 +BYTES_PER_SAMPLE = 2 + +# Voice‑specific params +BLOCK_FRAMES = 4096 # ~256 ms @16k, good for speech segments +BAND_LOW = 300 # Hz +BAND_HIGH = 3000 # Hz +ALPHA = 0.005 # slower baseline adaptation +MARGIN = 2.0 # multiplier above baseline RMS +COOLDOWN = 0.7 # seconds; suppress retriggers + +# Geometry +MIC_DISTANCE = 0.13 # meters between microphones +SPEED_OF_SOUND = 343.0 # m/s + +def read_block(f, block_bytes): + data = f.read(block_bytes) + if not data or len(data) < block_bytes: + return None + return np.frombuffer(data, dtype=np.int16) + +def bandpass_fft(x, fs, low, high): + n = len(x) + X = np.fft.rfft(x) + freqs = np.fft.rfftfreq(n, d=1.0/fs) + mask = (freqs >= low) & (freqs <= high) + X_filtered = X * mask + x_filtered = np.fft.irfft(X_filtered, n=n) + return x_filtered.astype(x.dtype) + +def gcc_phat(sig, refsig, fs, max_tau=None, interp=1): + n = sig.shape[0] + refsig.shape[0] + SIG = np.fft.rfft(sig, n=n) + REFSIG = np.fft.rfft(refsig, n=n) + R = SIG * np.conj(REFSIG) + R /= np.abs(R) + 1e-15 + cc = np.fft.irfft(R, n=(interp * n)) + + if max_tau is None: + max_tau = MIC_DISTANCE / SPEED_OF_SOUND + + max_shift = int(interp * fs * max_tau) + mid = cc.shape[0] // 2 + cc = np.concatenate((cc[mid - max_shift: mid + max_shift + 1],)) + shift = np.argmax(cc) - max_shift + tau = shift / float(interp * fs) + + # confidence: peak vs average correlation magnitude + peak_val = np.max(cc) + avg_val = np.mean(np.abs(cc)) + confidence = peak_val / (avg_val + 1e-9) + + return tau, confidence + +def tau_to_angle(tau, mic_distance, speed_of_sound): + arg = (tau * speed_of_sound) / mic_distance + arg = max(-1.0, min(1.0, arg)) + angle_rad = np.arcsin(arg) + return np.degrees(angle_rad) + +def main(): + block_bytes = BLOCK_FRAMES * CHANNELS * BYTES_PER_SAMPLE + baseline = None + last_trigger = 0.0 + + with open(FIFO_PATH, "rb") as f: + print("Listening for voice events (RMS + GCC-PHAT + confidence)...") + while True: + audio = read_block(f, block_bytes) + if audio is None: + continue + + left = audio[0::2] + right = audio[1::2] + + # RMS energy across both channels + rms = np.sqrt(np.mean(((left.astype(np.float32)**2 + right.astype(np.float32)**2) / 2))) + + if baseline is None: + baseline = rms + continue + + baseline = (1 - ALPHA) * baseline + ALPHA * rms + threshold = baseline * MARGIN + + now = time.time() + if now - last_trigger < COOLDOWN: + continue + + if rms <= threshold: + continue + + # Band-pass filter to voice band + l_bp = bandpass_fft(left.astype(np.float32), SAMPLE_RATE, BAND_LOW, BAND_HIGH) + r_bp = bandpass_fft(right.astype(np.float32), SAMPLE_RATE, BAND_LOW, BAND_HIGH) + + # GCC-PHAT for TDOA + confidence + tau, confidence = gcc_phat(l_bp, r_bp, SAMPLE_RATE, + max_tau=MIC_DISTANCE/SPEED_OF_SOUND, interp=4) + angle = tau_to_angle(tau, MIC_DISTANCE, SPEED_OF_SOUND) + + # Only report strong detections + if confidence > 2.0: + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + louder = "LEFT" if np.max(np.abs(left)) > np.max(np.abs(right)) else "RIGHT" + print(f"[{ts}] Voice event: {louder} louder | RMS={rms:.1f}, baseline={baseline:.1f}, " + f"TDOA={tau*1000:.2f} ms | angle≈{angle:.1f}° | conf={confidence:.2f}") + last_trigger = now + +if __name__ == "__main__": + main() diff --git a/tdoa_sharp_noises.py b/tdoa_sharp_noises.py new file mode 100644 index 0000000..b8b3172 --- /dev/null +++ b/tdoa_sharp_noises.py @@ -0,0 +1,141 @@ +import numpy as np +import time +from datetime import datetime + +FIFO_PATH = "/tmp/esp32_audio" +SAMPLE_RATE = 16000 +CHANNELS = 2 +BYTES_PER_SAMPLE = 2 + +# Detection and processing params +BLOCK_FRAMES = 2048 # ~128 ms @16k; large enough to catch an impulse +IMPULSE_WINDOW = 256 # samples around the detected peak for GCC-PHAT +BAND_LOW = 1000 # Hz +BAND_HIGH = 4000 # Hz +MARGIN = 3.0 # multiplier above rolling baseline for impulse detection +ALPHA = 0.01 # rolling baseline EMA smoothing +COOLDOWN = 0.5 # seconds; suppress retriggers from echoes + +# Geometry +MIC_DISTANCE = 0.20 # meters between microphones +SPEED_OF_SOUND = 343.0 # m/s + +def read_block(f, block_bytes): + data = f.read(block_bytes) + if not data or len(data) < block_bytes: + return None + return np.frombuffer(data, dtype=np.int16) + +def bandpass_fft(x, fs, low, high): + """Simple FFT band-pass: zero out bins outside [low, high].""" + n = len(x) + X = np.fft.rfft(x) + freqs = np.fft.rfftfreq(n, d=1.0/fs) + mask = (freqs >= low) & (freqs <= high) + X_filtered = X * mask + x_filtered = np.fft.irfft(X_filtered, n=n) + return x_filtered.astype(x.dtype) + +def gcc_phat(sig, refsig, fs, max_tau=None, interp=1): + """ + GCC-PHAT lag estimation between sig and refsig. + Returns time delay (tau) in seconds. + """ + n = sig.shape[0] + refsig.shape[0] + # FFT + SIG = np.fft.rfft(sig, n=n) + REFSIG = np.fft.rfft(refsig, n=n) + R = SIG * np.conj(REFSIG) + denom = np.abs(R) + R = R / (denom + 1e-15) + cc = np.fft.irfft(R, n=(interp * n)) + + if max_tau is None: + # physical max tau based on mic distance + max_tau = MIC_DISTANCE / SPEED_OF_SOUND + + max_shift = int(interp * fs * max_tau) + mid = cc.shape[0] // 2 + cc = np.concatenate((cc[mid - max_shift: mid + max_shift + 1],)) + shift = np.argmax(cc) - max_shift + tau = shift / float(interp * fs) + return tau + +def tau_to_angle(tau, mic_distance, speed_of_sound): + """ + Convert time difference to angle (-90..+90) assuming linear 2-mic array and far-field. + """ + # clamp sin argument to [-1,1] + arg = (tau * speed_of_sound) / mic_distance + arg = max(-1.0, min(1.0, arg)) + angle_rad = np.arcsin(arg) + return np.degrees(angle_rad) + +def main(): + block_bytes = BLOCK_FRAMES * CHANNELS * BYTES_PER_SAMPLE + baseline = None + last_trigger = 0.0 + + with open(FIFO_PATH, "rb") as f: + print("Listening: GCC-PHAT + impulse window + band-pass + cooldown") + while True: + audio = read_block(f, block_bytes) + if audio is None: + continue + + # Split stereo + left = audio[0::2] + right = audio[1::2] + + # Compute per-block peak level for rolling baseline + left_peak = np.max(np.abs(left)) + right_peak = np.max(np.abs(right)) + current_level = (left_peak + right_peak) / 2.0 + + if baseline is None: + baseline = current_level + continue + baseline = (1 - ALPHA) * baseline + ALPHA * current_level + threshold = baseline * MARGIN + + # Cooldown gate + now = time.time() + if now - last_trigger < COOLDOWN: + continue + + # Impulse gate: only proceed if strong spike above rolling threshold + if max(left_peak, right_peak) <= threshold: + continue + + # Find impulse index using combined magnitude + combined = np.abs(left) + np.abs(right) + peak_idx = int(np.argmax(combined)) + + # Window around impulse for robust localization + half = IMPULSE_WINDOW // 2 + start = max(0, peak_idx - half) + end = min(len(left), peak_idx + half) + l_win = left[start:end] + r_win = right[start:end] + + # Band-pass to 1–4 kHz to reduce low rumble/high hiss + l_bp = bandpass_fft(l_win.astype(np.float32), SAMPLE_RATE, BAND_LOW, BAND_HIGH) + r_bp = bandpass_fft(r_win.astype(np.float32), SAMPLE_RATE, BAND_LOW, BAND_HIGH) + + # GCC-PHAT for TDOA + # cap max_tau to physical limit to avoid spurious peaks + max_tau = MIC_DISTANCE / SPEED_OF_SOUND + tau = gcc_phat(l_bp, r_bp, SAMPLE_RATE, max_tau=max_tau, interp=1) + angle = tau_to_angle(tau, MIC_DISTANCE, SPEED_OF_SOUND) + + # Timestamp and report + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + louder = "LEFT" if left_peak > right_peak else "RIGHT" + print(f"[{ts}] Loud impulse: {louder} louder | TDOA={tau*1000:.2f} ms | angle≈{angle:.1f}° " + f"(baseline={baseline:.1f}, L={left_peak}, R={right_peak})") + + # Arm cooldown + last_trigger = now + +if __name__ == "__main__": + main()