tdoa.py now tailored towards voice, tdoa_sharp_noises.py for clicks and whistles
parent
40a17acda9
commit
2415377e44
|
|
@ -0,0 +1,117 @@
|
||||||
|
import numpy as np
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
FIFO_PATH = "/tmp/esp32_audio"
|
||||||
|
SAMPLE_RATE = 16000
|
||||||
|
CHANNELS = 2
|
||||||
|
BYTES_PER_SAMPLE = 2
|
||||||
|
|
||||||
|
# Voice‑specific params
|
||||||
|
BLOCK_FRAMES = 4096 # ~256 ms @16k, good for speech segments
|
||||||
|
BAND_LOW = 300 # Hz
|
||||||
|
BAND_HIGH = 3000 # Hz
|
||||||
|
ALPHA = 0.005 # slower baseline adaptation
|
||||||
|
MARGIN = 2.0 # multiplier above baseline RMS
|
||||||
|
COOLDOWN = 0.7 # seconds; suppress retriggers
|
||||||
|
|
||||||
|
# Geometry
|
||||||
|
MIC_DISTANCE = 0.13 # meters between microphones
|
||||||
|
SPEED_OF_SOUND = 343.0 # m/s
|
||||||
|
|
||||||
|
def read_block(f, block_bytes):
|
||||||
|
data = f.read(block_bytes)
|
||||||
|
if not data or len(data) < block_bytes:
|
||||||
|
return None
|
||||||
|
return np.frombuffer(data, dtype=np.int16)
|
||||||
|
|
||||||
|
def bandpass_fft(x, fs, low, high):
|
||||||
|
n = len(x)
|
||||||
|
X = np.fft.rfft(x)
|
||||||
|
freqs = np.fft.rfftfreq(n, d=1.0/fs)
|
||||||
|
mask = (freqs >= low) & (freqs <= high)
|
||||||
|
X_filtered = X * mask
|
||||||
|
x_filtered = np.fft.irfft(X_filtered, n=n)
|
||||||
|
return x_filtered.astype(x.dtype)
|
||||||
|
|
||||||
|
def gcc_phat(sig, refsig, fs, max_tau=None, interp=1):
|
||||||
|
n = sig.shape[0] + refsig.shape[0]
|
||||||
|
SIG = np.fft.rfft(sig, n=n)
|
||||||
|
REFSIG = np.fft.rfft(refsig, n=n)
|
||||||
|
R = SIG * np.conj(REFSIG)
|
||||||
|
R /= np.abs(R) + 1e-15
|
||||||
|
cc = np.fft.irfft(R, n=(interp * n))
|
||||||
|
|
||||||
|
if max_tau is None:
|
||||||
|
max_tau = MIC_DISTANCE / SPEED_OF_SOUND
|
||||||
|
|
||||||
|
max_shift = int(interp * fs * max_tau)
|
||||||
|
mid = cc.shape[0] // 2
|
||||||
|
cc = np.concatenate((cc[mid - max_shift: mid + max_shift + 1],))
|
||||||
|
shift = np.argmax(cc) - max_shift
|
||||||
|
tau = shift / float(interp * fs)
|
||||||
|
|
||||||
|
# confidence: peak vs average correlation magnitude
|
||||||
|
peak_val = np.max(cc)
|
||||||
|
avg_val = np.mean(np.abs(cc))
|
||||||
|
confidence = peak_val / (avg_val + 1e-9)
|
||||||
|
|
||||||
|
return tau, confidence
|
||||||
|
|
||||||
|
def tau_to_angle(tau, mic_distance, speed_of_sound):
|
||||||
|
arg = (tau * speed_of_sound) / mic_distance
|
||||||
|
arg = max(-1.0, min(1.0, arg))
|
||||||
|
angle_rad = np.arcsin(arg)
|
||||||
|
return np.degrees(angle_rad)
|
||||||
|
|
||||||
|
def main():
|
||||||
|
block_bytes = BLOCK_FRAMES * CHANNELS * BYTES_PER_SAMPLE
|
||||||
|
baseline = None
|
||||||
|
last_trigger = 0.0
|
||||||
|
|
||||||
|
with open(FIFO_PATH, "rb") as f:
|
||||||
|
print("Listening for voice events (RMS + GCC-PHAT + confidence)...")
|
||||||
|
while True:
|
||||||
|
audio = read_block(f, block_bytes)
|
||||||
|
if audio is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
left = audio[0::2]
|
||||||
|
right = audio[1::2]
|
||||||
|
|
||||||
|
# RMS energy across both channels
|
||||||
|
rms = np.sqrt(np.mean(((left.astype(np.float32)**2 + right.astype(np.float32)**2) / 2)))
|
||||||
|
|
||||||
|
if baseline is None:
|
||||||
|
baseline = rms
|
||||||
|
continue
|
||||||
|
|
||||||
|
baseline = (1 - ALPHA) * baseline + ALPHA * rms
|
||||||
|
threshold = baseline * MARGIN
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
if now - last_trigger < COOLDOWN:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if rms <= threshold:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Band-pass filter to voice band
|
||||||
|
l_bp = bandpass_fft(left.astype(np.float32), SAMPLE_RATE, BAND_LOW, BAND_HIGH)
|
||||||
|
r_bp = bandpass_fft(right.astype(np.float32), SAMPLE_RATE, BAND_LOW, BAND_HIGH)
|
||||||
|
|
||||||
|
# GCC-PHAT for TDOA + confidence
|
||||||
|
tau, confidence = gcc_phat(l_bp, r_bp, SAMPLE_RATE,
|
||||||
|
max_tau=MIC_DISTANCE/SPEED_OF_SOUND, interp=4)
|
||||||
|
angle = tau_to_angle(tau, MIC_DISTANCE, SPEED_OF_SOUND)
|
||||||
|
|
||||||
|
# Only report strong detections
|
||||||
|
if confidence > 2.0:
|
||||||
|
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
||||||
|
louder = "LEFT" if np.max(np.abs(left)) > np.max(np.abs(right)) else "RIGHT"
|
||||||
|
print(f"[{ts}] Voice event: {louder} louder | RMS={rms:.1f}, baseline={baseline:.1f}, "
|
||||||
|
f"TDOA={tau*1000:.2f} ms | angle≈{angle:.1f}° | conf={confidence:.2f}")
|
||||||
|
last_trigger = now
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -0,0 +1,141 @@
|
||||||
|
import numpy as np
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
FIFO_PATH = "/tmp/esp32_audio"
|
||||||
|
SAMPLE_RATE = 16000
|
||||||
|
CHANNELS = 2
|
||||||
|
BYTES_PER_SAMPLE = 2
|
||||||
|
|
||||||
|
# Detection and processing params
|
||||||
|
BLOCK_FRAMES = 2048 # ~128 ms @16k; large enough to catch an impulse
|
||||||
|
IMPULSE_WINDOW = 256 # samples around the detected peak for GCC-PHAT
|
||||||
|
BAND_LOW = 1000 # Hz
|
||||||
|
BAND_HIGH = 4000 # Hz
|
||||||
|
MARGIN = 3.0 # multiplier above rolling baseline for impulse detection
|
||||||
|
ALPHA = 0.01 # rolling baseline EMA smoothing
|
||||||
|
COOLDOWN = 0.5 # seconds; suppress retriggers from echoes
|
||||||
|
|
||||||
|
# Geometry
|
||||||
|
MIC_DISTANCE = 0.20 # meters between microphones
|
||||||
|
SPEED_OF_SOUND = 343.0 # m/s
|
||||||
|
|
||||||
|
def read_block(f, block_bytes):
|
||||||
|
data = f.read(block_bytes)
|
||||||
|
if not data or len(data) < block_bytes:
|
||||||
|
return None
|
||||||
|
return np.frombuffer(data, dtype=np.int16)
|
||||||
|
|
||||||
|
def bandpass_fft(x, fs, low, high):
|
||||||
|
"""Simple FFT band-pass: zero out bins outside [low, high]."""
|
||||||
|
n = len(x)
|
||||||
|
X = np.fft.rfft(x)
|
||||||
|
freqs = np.fft.rfftfreq(n, d=1.0/fs)
|
||||||
|
mask = (freqs >= low) & (freqs <= high)
|
||||||
|
X_filtered = X * mask
|
||||||
|
x_filtered = np.fft.irfft(X_filtered, n=n)
|
||||||
|
return x_filtered.astype(x.dtype)
|
||||||
|
|
||||||
|
def gcc_phat(sig, refsig, fs, max_tau=None, interp=1):
|
||||||
|
"""
|
||||||
|
GCC-PHAT lag estimation between sig and refsig.
|
||||||
|
Returns time delay (tau) in seconds.
|
||||||
|
"""
|
||||||
|
n = sig.shape[0] + refsig.shape[0]
|
||||||
|
# FFT
|
||||||
|
SIG = np.fft.rfft(sig, n=n)
|
||||||
|
REFSIG = np.fft.rfft(refsig, n=n)
|
||||||
|
R = SIG * np.conj(REFSIG)
|
||||||
|
denom = np.abs(R)
|
||||||
|
R = R / (denom + 1e-15)
|
||||||
|
cc = np.fft.irfft(R, n=(interp * n))
|
||||||
|
|
||||||
|
if max_tau is None:
|
||||||
|
# physical max tau based on mic distance
|
||||||
|
max_tau = MIC_DISTANCE / SPEED_OF_SOUND
|
||||||
|
|
||||||
|
max_shift = int(interp * fs * max_tau)
|
||||||
|
mid = cc.shape[0] // 2
|
||||||
|
cc = np.concatenate((cc[mid - max_shift: mid + max_shift + 1],))
|
||||||
|
shift = np.argmax(cc) - max_shift
|
||||||
|
tau = shift / float(interp * fs)
|
||||||
|
return tau
|
||||||
|
|
||||||
|
def tau_to_angle(tau, mic_distance, speed_of_sound):
|
||||||
|
"""
|
||||||
|
Convert time difference to angle (-90..+90) assuming linear 2-mic array and far-field.
|
||||||
|
"""
|
||||||
|
# clamp sin argument to [-1,1]
|
||||||
|
arg = (tau * speed_of_sound) / mic_distance
|
||||||
|
arg = max(-1.0, min(1.0, arg))
|
||||||
|
angle_rad = np.arcsin(arg)
|
||||||
|
return np.degrees(angle_rad)
|
||||||
|
|
||||||
|
def main():
|
||||||
|
block_bytes = BLOCK_FRAMES * CHANNELS * BYTES_PER_SAMPLE
|
||||||
|
baseline = None
|
||||||
|
last_trigger = 0.0
|
||||||
|
|
||||||
|
with open(FIFO_PATH, "rb") as f:
|
||||||
|
print("Listening: GCC-PHAT + impulse window + band-pass + cooldown")
|
||||||
|
while True:
|
||||||
|
audio = read_block(f, block_bytes)
|
||||||
|
if audio is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Split stereo
|
||||||
|
left = audio[0::2]
|
||||||
|
right = audio[1::2]
|
||||||
|
|
||||||
|
# Compute per-block peak level for rolling baseline
|
||||||
|
left_peak = np.max(np.abs(left))
|
||||||
|
right_peak = np.max(np.abs(right))
|
||||||
|
current_level = (left_peak + right_peak) / 2.0
|
||||||
|
|
||||||
|
if baseline is None:
|
||||||
|
baseline = current_level
|
||||||
|
continue
|
||||||
|
baseline = (1 - ALPHA) * baseline + ALPHA * current_level
|
||||||
|
threshold = baseline * MARGIN
|
||||||
|
|
||||||
|
# Cooldown gate
|
||||||
|
now = time.time()
|
||||||
|
if now - last_trigger < COOLDOWN:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Impulse gate: only proceed if strong spike above rolling threshold
|
||||||
|
if max(left_peak, right_peak) <= threshold:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Find impulse index using combined magnitude
|
||||||
|
combined = np.abs(left) + np.abs(right)
|
||||||
|
peak_idx = int(np.argmax(combined))
|
||||||
|
|
||||||
|
# Window around impulse for robust localization
|
||||||
|
half = IMPULSE_WINDOW // 2
|
||||||
|
start = max(0, peak_idx - half)
|
||||||
|
end = min(len(left), peak_idx + half)
|
||||||
|
l_win = left[start:end]
|
||||||
|
r_win = right[start:end]
|
||||||
|
|
||||||
|
# Band-pass to 1–4 kHz to reduce low rumble/high hiss
|
||||||
|
l_bp = bandpass_fft(l_win.astype(np.float32), SAMPLE_RATE, BAND_LOW, BAND_HIGH)
|
||||||
|
r_bp = bandpass_fft(r_win.astype(np.float32), SAMPLE_RATE, BAND_LOW, BAND_HIGH)
|
||||||
|
|
||||||
|
# GCC-PHAT for TDOA
|
||||||
|
# cap max_tau to physical limit to avoid spurious peaks
|
||||||
|
max_tau = MIC_DISTANCE / SPEED_OF_SOUND
|
||||||
|
tau = gcc_phat(l_bp, r_bp, SAMPLE_RATE, max_tau=max_tau, interp=1)
|
||||||
|
angle = tau_to_angle(tau, MIC_DISTANCE, SPEED_OF_SOUND)
|
||||||
|
|
||||||
|
# Timestamp and report
|
||||||
|
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
||||||
|
louder = "LEFT" if left_peak > right_peak else "RIGHT"
|
||||||
|
print(f"[{ts}] Loud impulse: {louder} louder | TDOA={tau*1000:.2f} ms | angle≈{angle:.1f}° "
|
||||||
|
f"(baseline={baseline:.1f}, L={left_peak}, R={right_peak})")
|
||||||
|
|
||||||
|
# Arm cooldown
|
||||||
|
last_trigger = now
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Reference in New Issue