serial_audio_catcher/tdoa.py

118 lines
3.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import numpy as np
import time
from datetime import datetime
FIFO_PATH = "/tmp/esp32_audio"
SAMPLE_RATE = 16000
CHANNELS = 2
BYTES_PER_SAMPLE = 2
# Voicespecific params
BLOCK_FRAMES = 4096 # ~256 ms @16k, good for speech segments
BAND_LOW = 300 # Hz
BAND_HIGH = 3000 # Hz
ALPHA = 0.005 # slower baseline adaptation
MARGIN = 2.0 # multiplier above baseline RMS
COOLDOWN = 0.7 # seconds; suppress retriggers
# Geometry
MIC_DISTANCE = 0.13 # meters between microphones
SPEED_OF_SOUND = 343.0 # m/s
def read_block(f, block_bytes):
data = f.read(block_bytes)
if not data or len(data) < block_bytes:
return None
return np.frombuffer(data, dtype=np.int16)
def bandpass_fft(x, fs, low, high):
n = len(x)
X = np.fft.rfft(x)
freqs = np.fft.rfftfreq(n, d=1.0/fs)
mask = (freqs >= low) & (freqs <= high)
X_filtered = X * mask
x_filtered = np.fft.irfft(X_filtered, n=n)
return x_filtered.astype(x.dtype)
def gcc_phat(sig, refsig, fs, max_tau=None, interp=1):
n = sig.shape[0] + refsig.shape[0]
SIG = np.fft.rfft(sig, n=n)
REFSIG = np.fft.rfft(refsig, n=n)
R = SIG * np.conj(REFSIG)
R /= np.abs(R) + 1e-15
cc = np.fft.irfft(R, n=(interp * n))
if max_tau is None:
max_tau = MIC_DISTANCE / SPEED_OF_SOUND
max_shift = int(interp * fs * max_tau)
mid = cc.shape[0] // 2
cc = np.concatenate((cc[mid - max_shift: mid + max_shift + 1],))
shift = np.argmax(cc) - max_shift
tau = shift / float(interp * fs)
# confidence: peak vs average correlation magnitude
peak_val = np.max(cc)
avg_val = np.mean(np.abs(cc))
confidence = peak_val / (avg_val + 1e-9)
return tau, confidence
def tau_to_angle(tau, mic_distance, speed_of_sound):
arg = (tau * speed_of_sound) / mic_distance
arg = max(-1.0, min(1.0, arg))
angle_rad = np.arcsin(arg)
return np.degrees(angle_rad)
def main():
block_bytes = BLOCK_FRAMES * CHANNELS * BYTES_PER_SAMPLE
baseline = None
last_trigger = 0.0
with open(FIFO_PATH, "rb") as f:
print("Listening for voice events (RMS + GCC-PHAT + confidence)...")
while True:
audio = read_block(f, block_bytes)
if audio is None:
continue
left = audio[0::2]
right = audio[1::2]
# RMS energy across both channels
rms = np.sqrt(np.mean(((left.astype(np.float32)**2 + right.astype(np.float32)**2) / 2)))
if baseline is None:
baseline = rms
continue
baseline = (1 - ALPHA) * baseline + ALPHA * rms
threshold = baseline * MARGIN
now = time.time()
if now - last_trigger < COOLDOWN:
continue
if rms <= threshold:
continue
# Band-pass filter to voice band
l_bp = bandpass_fft(left.astype(np.float32), SAMPLE_RATE, BAND_LOW, BAND_HIGH)
r_bp = bandpass_fft(right.astype(np.float32), SAMPLE_RATE, BAND_LOW, BAND_HIGH)
# GCC-PHAT for TDOA + confidence
tau, confidence = gcc_phat(l_bp, r_bp, SAMPLE_RATE,
max_tau=MIC_DISTANCE/SPEED_OF_SOUND, interp=4)
angle = tau_to_angle(tau, MIC_DISTANCE, SPEED_OF_SOUND)
# Only report strong detections
if confidence > 2.0:
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
louder = "LEFT" if np.max(np.abs(left)) > np.max(np.abs(right)) else "RIGHT"
print(f"[{ts}] Voice event: {louder} louder | RMS={rms:.1f}, baseline={baseline:.1f}, "
f"TDOA={tau*1000:.2f} ms | angle≈{angle:.1f}° | conf={confidence:.2f}")
last_trigger = now
if __name__ == "__main__":
main()