serial_audio_catcher/direction_finder.py

69 lines
2.3 KiB
Python

import numpy as np
import time
from datetime import datetime
FIFO_PATH = "/tmp/esp32_audio"
SAMPLE_RATE = 16000
CHANNELS = 2 # set to 1 if mono
BLOCK_FRAMES = 512 # ~32 ms at 16 kHz
BYTES_PER_SAMPLE = 2 # s16le
MARGIN = 3.0 # multiplier above baseline
ALPHA = 0.01 # smoothing factor for rolling baseline
COOLDOWN = 0.5 # seconds to ignore after a trigger
def read_block(f, block_bytes):
data = f.read(block_bytes)
if not data:
return None
return np.frombuffer(data, dtype=np.int16)
def main():
block_bytes = BLOCK_FRAMES * CHANNELS * BYTES_PER_SAMPLE
with open(FIFO_PATH, "rb") as f:
print("Listening with rolling baseline + cooldown...")
baseline = None
last_trigger = 0
while True:
audio = read_block(f, block_bytes)
if audio is None:
continue
if CHANNELS == 2:
left = audio[0::2]
right = audio[1::2]
left_peak = np.max(np.abs(left))
right_peak = np.max(np.abs(right))
current_level = (left_peak + right_peak) / 2
else:
current_level = np.max(np.abs(audio))
if baseline is None:
baseline = current_level
continue
baseline = (1 - ALPHA) * baseline + ALPHA * current_level
threshold = baseline * MARGIN
now = time.time()
if now - last_trigger < COOLDOWN:
continue # skip triggers during cooldown
if CHANNELS == 2:
if left_peak > threshold or right_peak > threshold:
louder = "LEFT" if left_peak > right_peak else "RIGHT"
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(f"[{ts}] Loud noise! {louder} channel louder "
f"(L={left_peak}, R={right_peak}, baseline={baseline:.1f})")
last_trigger = now
else:
if current_level > threshold:
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
print(f"[{ts}] Loud noise! Peak={current_level}, baseline={baseline:.1f}")
last_trigger = now
if __name__ == "__main__":
main()