serial_audio_catcher/tdoa_sharp_noises.py

142 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import numpy as np
import time
from datetime import datetime
FIFO_PATH = "/tmp/esp32_audio"
SAMPLE_RATE = 16000
CHANNELS = 2
BYTES_PER_SAMPLE = 2
# Detection and processing params
BLOCK_FRAMES = 2048 # ~128 ms @16k; large enough to catch an impulse
IMPULSE_WINDOW = 256 # samples around the detected peak for GCC-PHAT
BAND_LOW = 1000 # Hz
BAND_HIGH = 4000 # Hz
MARGIN = 3.0 # multiplier above rolling baseline for impulse detection
ALPHA = 0.01 # rolling baseline EMA smoothing
COOLDOWN = 0.5 # seconds; suppress retriggers from echoes
# Geometry
MIC_DISTANCE = 0.20 # meters between microphones
SPEED_OF_SOUND = 343.0 # m/s
def read_block(f, block_bytes):
data = f.read(block_bytes)
if not data or len(data) < block_bytes:
return None
return np.frombuffer(data, dtype=np.int16)
def bandpass_fft(x, fs, low, high):
"""Simple FFT band-pass: zero out bins outside [low, high]."""
n = len(x)
X = np.fft.rfft(x)
freqs = np.fft.rfftfreq(n, d=1.0/fs)
mask = (freqs >= low) & (freqs <= high)
X_filtered = X * mask
x_filtered = np.fft.irfft(X_filtered, n=n)
return x_filtered.astype(x.dtype)
def gcc_phat(sig, refsig, fs, max_tau=None, interp=1):
"""
GCC-PHAT lag estimation between sig and refsig.
Returns time delay (tau) in seconds.
"""
n = sig.shape[0] + refsig.shape[0]
# FFT
SIG = np.fft.rfft(sig, n=n)
REFSIG = np.fft.rfft(refsig, n=n)
R = SIG * np.conj(REFSIG)
denom = np.abs(R)
R = R / (denom + 1e-15)
cc = np.fft.irfft(R, n=(interp * n))
if max_tau is None:
# physical max tau based on mic distance
max_tau = MIC_DISTANCE / SPEED_OF_SOUND
max_shift = int(interp * fs * max_tau)
mid = cc.shape[0] // 2
cc = np.concatenate((cc[mid - max_shift: mid + max_shift + 1],))
shift = np.argmax(cc) - max_shift
tau = shift / float(interp * fs)
return tau
def tau_to_angle(tau, mic_distance, speed_of_sound):
"""
Convert time difference to angle (-90..+90) assuming linear 2-mic array and far-field.
"""
# clamp sin argument to [-1,1]
arg = (tau * speed_of_sound) / mic_distance
arg = max(-1.0, min(1.0, arg))
angle_rad = np.arcsin(arg)
return np.degrees(angle_rad)
def main():
block_bytes = BLOCK_FRAMES * CHANNELS * BYTES_PER_SAMPLE
baseline = None
last_trigger = 0.0
with open(FIFO_PATH, "rb") as f:
print("Listening: GCC-PHAT + impulse window + band-pass + cooldown")
while True:
audio = read_block(f, block_bytes)
if audio is None:
continue
# Split stereo
left = audio[0::2]
right = audio[1::2]
# Compute per-block peak level for rolling baseline
left_peak = np.max(np.abs(left))
right_peak = np.max(np.abs(right))
current_level = (left_peak + right_peak) / 2.0
if baseline is None:
baseline = current_level
continue
baseline = (1 - ALPHA) * baseline + ALPHA * current_level
threshold = baseline * MARGIN
# Cooldown gate
now = time.time()
if now - last_trigger < COOLDOWN:
continue
# Impulse gate: only proceed if strong spike above rolling threshold
if max(left_peak, right_peak) <= threshold:
continue
# Find impulse index using combined magnitude
combined = np.abs(left) + np.abs(right)
peak_idx = int(np.argmax(combined))
# Window around impulse for robust localization
half = IMPULSE_WINDOW // 2
start = max(0, peak_idx - half)
end = min(len(left), peak_idx + half)
l_win = left[start:end]
r_win = right[start:end]
# Band-pass to 14 kHz to reduce low rumble/high hiss
l_bp = bandpass_fft(l_win.astype(np.float32), SAMPLE_RATE, BAND_LOW, BAND_HIGH)
r_bp = bandpass_fft(r_win.astype(np.float32), SAMPLE_RATE, BAND_LOW, BAND_HIGH)
# GCC-PHAT for TDOA
# cap max_tau to physical limit to avoid spurious peaks
max_tau = MIC_DISTANCE / SPEED_OF_SOUND
tau = gcc_phat(l_bp, r_bp, SAMPLE_RATE, max_tau=max_tau, interp=1)
angle = tau_to_angle(tau, MIC_DISTANCE, SPEED_OF_SOUND)
# Timestamp and report
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
louder = "LEFT" if left_peak > right_peak else "RIGHT"
print(f"[{ts}] Loud impulse: {louder} louder | TDOA={tau*1000:.2f} ms | angle≈{angle:.1f}° "
f"(baseline={baseline:.1f}, L={left_peak}, R={right_peak})")
# Arm cooldown
last_trigger = now
if __name__ == "__main__":
main()