added direction_finder.py, gets direction of loudest sound above a threshold over a rolling baseline
parent
85b213e194
commit
40a17acda9
|
|
@ -0,0 +1,68 @@
|
|||
import numpy as np
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
FIFO_PATH = "/tmp/esp32_audio"
|
||||
SAMPLE_RATE = 16000
|
||||
CHANNELS = 2 # set to 1 if mono
|
||||
BLOCK_FRAMES = 512 # ~32 ms at 16 kHz
|
||||
BYTES_PER_SAMPLE = 2 # s16le
|
||||
|
||||
MARGIN = 3.0 # multiplier above baseline
|
||||
ALPHA = 0.01 # smoothing factor for rolling baseline
|
||||
COOLDOWN = 0.5 # seconds to ignore after a trigger
|
||||
|
||||
def read_block(f, block_bytes):
|
||||
data = f.read(block_bytes)
|
||||
if not data:
|
||||
return None
|
||||
return np.frombuffer(data, dtype=np.int16)
|
||||
|
||||
def main():
|
||||
block_bytes = BLOCK_FRAMES * CHANNELS * BYTES_PER_SAMPLE
|
||||
|
||||
with open(FIFO_PATH, "rb") as f:
|
||||
print("Listening with rolling baseline + cooldown...")
|
||||
baseline = None
|
||||
last_trigger = 0
|
||||
|
||||
while True:
|
||||
audio = read_block(f, block_bytes)
|
||||
if audio is None:
|
||||
continue
|
||||
|
||||
if CHANNELS == 2:
|
||||
left = audio[0::2]
|
||||
right = audio[1::2]
|
||||
left_peak = np.max(np.abs(left))
|
||||
right_peak = np.max(np.abs(right))
|
||||
current_level = (left_peak + right_peak) / 2
|
||||
else:
|
||||
current_level = np.max(np.abs(audio))
|
||||
|
||||
if baseline is None:
|
||||
baseline = current_level
|
||||
continue
|
||||
|
||||
baseline = (1 - ALPHA) * baseline + ALPHA * current_level
|
||||
threshold = baseline * MARGIN
|
||||
|
||||
now = time.time()
|
||||
if now - last_trigger < COOLDOWN:
|
||||
continue # skip triggers during cooldown
|
||||
|
||||
if CHANNELS == 2:
|
||||
if left_peak > threshold or right_peak > threshold:
|
||||
louder = "LEFT" if left_peak > right_peak else "RIGHT"
|
||||
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
||||
print(f"[{ts}] Loud noise! {louder} channel louder "
|
||||
f"(L={left_peak}, R={right_peak}, baseline={baseline:.1f})")
|
||||
last_trigger = now
|
||||
else:
|
||||
if current_level > threshold:
|
||||
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
||||
print(f"[{ts}] Loud noise! Peak={current_level}, baseline={baseline:.1f}")
|
||||
last_trigger = now
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in New Issue