serial_audio_catcher/rntest.py

111 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import wave
import numpy as np
import ctypes
from ctypes import c_void_p, c_float, POINTER
from ctypes.util import find_library
from scipy.signal import butter, lfilter
FIFO_PATH = "/tmp/esp32_audio"
RAW_FILE = "raw_mono_48k.wav"
DENOISED_FILE = "denoised_mono_48k.wav"
IN_SR = 16000
TARGET_SR = 48000
CHANNELS_IN = 2
BYTES_PER_SAMPLE = 2
FRAME_SIZE = 480 # RNNoise frame size at 48kHz
IN_FRAME_16K = 160 # 160 samples @16kHz → upsample ×3 → 480 @48kHz
# --- High-pass filter design ---
def highpass_filter(data, cutoff=100, fs=TARGET_SR, order=4):
b, a = butter(order, cutoff / (0.5 * fs), btype='high', analog=False)
return lfilter(b, a, data)
# --- Simple linear upsample 16k → 48k (factor 3) ---
def upsample3(x):
out = np.empty(len(x)*3, dtype=np.float32)
out[0::3] = x
out[1::3] = (2*x + np.append(x[1:], x[-1]))/3.0
out[2::3] = (x + np.append(x[1:], x[-1]))/2.0
return out
# --- Load RNNoise ---
libname = find_library("rnnoise")
if not libname:
raise RuntimeError("librnnoise not found. Run sudo ldconfig after install.")
rn = ctypes.CDLL(libname)
rn.rnnoise_create.argtypes = [c_void_p] # takes RNNModel* (NULL for default)
rn.rnnoise_create.restype = c_void_p
rn.rnnoise_destroy.argtypes = [c_void_p]
rn.rnnoise_process_frame.argtypes = [c_void_p,
POINTER(c_float),
POINTER(c_float)]
rn.rnnoise_process_frame.restype = c_float
st = rn.rnnoise_create(None) # NULL = default model
# --- Configure WAV writers ---
raw_wav = wave.open(RAW_FILE, "wb")
raw_wav.setnchannels(1)
raw_wav.setsampwidth(2)
raw_wav.setframerate(TARGET_SR)
den_wav = wave.open(DENOISED_FILE, "wb")
den_wav.setnchannels(1)
den_wav.setsampwidth(2)
den_wav.setframerate(TARGET_SR)
buf = np.empty((0,), dtype=np.int16)
print(f"Recording {FIFO_PATH}{RAW_FILE}, {DENOISED_FILE}")
try:
with open(FIFO_PATH, "rb") as f:
while True:
data = f.read(IN_FRAME_16K * BYTES_PER_SAMPLE * CHANNELS_IN)
if not data:
continue
# Downmix stereo → mono @16k
stereo = np.frombuffer(data, dtype=np.int16).reshape(-1, CHANNELS_IN)
mono16 = stereo.mean(axis=1).astype(np.int16)
buf = np.concatenate([buf, mono16])
# Process when we have multiples of 160 samples
while len(buf) >= IN_FRAME_16K:
frame16 = buf[:IN_FRAME_16K].astype(np.float32) / 32768.0
buf = buf[IN_FRAME_16K:]
# Upsample to 48kHz (480 samples)
frame48 = upsample3(frame16)
# --- Apply high-pass filter ---
frame48 = highpass_filter(frame48, cutoff=100, fs=TARGET_SR)
frame48 = np.ascontiguousarray(frame48, dtype=np.float32)
# --- Write raw upsampled mono (with HPF) ---
raw_wav.writeframes(
np.clip(frame48 * 32767.0, -32768, 32767).astype(np.int16).tobytes()
)
# --- Denoise ---
out48 = np.zeros(FRAME_SIZE, dtype=np.float32)
rn.rnnoise_process_frame(
st,
out48.ctypes.data_as(POINTER(c_float)),
frame48.ctypes.data_as(POINTER(c_float))
)
den_wav.writeframes(
np.clip(out48 * 32767.0, -32768, 32767).astype(np.int16).tobytes()
)
except KeyboardInterrupt:
pass
finally:
raw_wav.close()
den_wav.close()
rn.rnnoise_destroy(st)
print(f"Saved {RAW_FILE} and {DENOISED_FILE}")