diff --git a/direction_finder.py b/direction_finder.py new file mode 100644 index 0000000..25c5ca5 --- /dev/null +++ b/direction_finder.py @@ -0,0 +1,68 @@ +import numpy as np +import time +from datetime import datetime + +FIFO_PATH = "/tmp/esp32_audio" +SAMPLE_RATE = 16000 +CHANNELS = 2 # set to 1 if mono +BLOCK_FRAMES = 512 # ~32 ms at 16 kHz +BYTES_PER_SAMPLE = 2 # s16le + +MARGIN = 3.0 # multiplier above baseline +ALPHA = 0.01 # smoothing factor for rolling baseline +COOLDOWN = 0.5 # seconds to ignore after a trigger + +def read_block(f, block_bytes): + data = f.read(block_bytes) + if not data: + return None + return np.frombuffer(data, dtype=np.int16) + +def main(): + block_bytes = BLOCK_FRAMES * CHANNELS * BYTES_PER_SAMPLE + + with open(FIFO_PATH, "rb") as f: + print("Listening with rolling baseline + cooldown...") + baseline = None + last_trigger = 0 + + while True: + audio = read_block(f, block_bytes) + if audio is None: + continue + + if CHANNELS == 2: + left = audio[0::2] + right = audio[1::2] + left_peak = np.max(np.abs(left)) + right_peak = np.max(np.abs(right)) + current_level = (left_peak + right_peak) / 2 + else: + current_level = np.max(np.abs(audio)) + + if baseline is None: + baseline = current_level + continue + + baseline = (1 - ALPHA) * baseline + ALPHA * current_level + threshold = baseline * MARGIN + + now = time.time() + if now - last_trigger < COOLDOWN: + continue # skip triggers during cooldown + + if CHANNELS == 2: + if left_peak > threshold or right_peak > threshold: + louder = "LEFT" if left_peak > right_peak else "RIGHT" + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + print(f"[{ts}] Loud noise! {louder} channel louder " + f"(L={left_peak}, R={right_peak}, baseline={baseline:.1f})") + last_trigger = now + else: + if current_level > threshold: + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + print(f"[{ts}] Loud noise! Peak={current_level}, baseline={baseline:.1f}") + last_trigger = now + +if __name__ == "__main__": + main()