111 lines
3.6 KiB
Python
111 lines
3.6 KiB
Python
import wave
|
||
import numpy as np
|
||
import ctypes
|
||
from ctypes import c_void_p, c_float, POINTER
|
||
from ctypes.util import find_library
|
||
from scipy.signal import butter, lfilter
|
||
|
||
FIFO_PATH = "/tmp/esp32_audio"
|
||
RAW_FILE = "raw_mono_48k.wav"
|
||
DENOISED_FILE = "denoised_mono_48k.wav"
|
||
|
||
IN_SR = 16000
|
||
TARGET_SR = 48000
|
||
CHANNELS_IN = 2
|
||
BYTES_PER_SAMPLE = 2
|
||
FRAME_SIZE = 480 # RNNoise frame size at 48kHz
|
||
IN_FRAME_16K = 160 # 160 samples @16kHz → upsample ×3 → 480 @48kHz
|
||
|
||
# --- High-pass filter design ---
|
||
def highpass_filter(data, cutoff=100, fs=TARGET_SR, order=4):
|
||
b, a = butter(order, cutoff / (0.5 * fs), btype='high', analog=False)
|
||
return lfilter(b, a, data)
|
||
|
||
# --- Simple linear upsample 16k → 48k (factor 3) ---
|
||
def upsample3(x):
|
||
out = np.empty(len(x)*3, dtype=np.float32)
|
||
out[0::3] = x
|
||
out[1::3] = (2*x + np.append(x[1:], x[-1]))/3.0
|
||
out[2::3] = (x + np.append(x[1:], x[-1]))/2.0
|
||
return out
|
||
|
||
# --- Load RNNoise ---
|
||
libname = find_library("rnnoise")
|
||
if not libname:
|
||
raise RuntimeError("librnnoise not found. Run sudo ldconfig after install.")
|
||
rn = ctypes.CDLL(libname)
|
||
|
||
rn.rnnoise_create.argtypes = [c_void_p] # takes RNNModel* (NULL for default)
|
||
rn.rnnoise_create.restype = c_void_p
|
||
rn.rnnoise_destroy.argtypes = [c_void_p]
|
||
rn.rnnoise_process_frame.argtypes = [c_void_p,
|
||
POINTER(c_float),
|
||
POINTER(c_float)]
|
||
rn.rnnoise_process_frame.restype = c_float
|
||
|
||
st = rn.rnnoise_create(None) # NULL = default model
|
||
|
||
# --- Configure WAV writers ---
|
||
raw_wav = wave.open(RAW_FILE, "wb")
|
||
raw_wav.setnchannels(1)
|
||
raw_wav.setsampwidth(2)
|
||
raw_wav.setframerate(TARGET_SR)
|
||
|
||
den_wav = wave.open(DENOISED_FILE, "wb")
|
||
den_wav.setnchannels(1)
|
||
den_wav.setsampwidth(2)
|
||
den_wav.setframerate(TARGET_SR)
|
||
|
||
buf = np.empty((0,), dtype=np.int16)
|
||
|
||
print(f"Recording {FIFO_PATH} → {RAW_FILE}, {DENOISED_FILE}")
|
||
try:
|
||
with open(FIFO_PATH, "rb") as f:
|
||
while True:
|
||
data = f.read(IN_FRAME_16K * BYTES_PER_SAMPLE * CHANNELS_IN)
|
||
if not data:
|
||
continue
|
||
|
||
# Downmix stereo → mono @16k
|
||
stereo = np.frombuffer(data, dtype=np.int16).reshape(-1, CHANNELS_IN)
|
||
mono16 = stereo.mean(axis=1).astype(np.int16)
|
||
|
||
buf = np.concatenate([buf, mono16])
|
||
|
||
# Process when we have multiples of 160 samples
|
||
while len(buf) >= IN_FRAME_16K:
|
||
frame16 = buf[:IN_FRAME_16K].astype(np.float32) / 32768.0
|
||
buf = buf[IN_FRAME_16K:]
|
||
|
||
# Upsample to 48kHz (480 samples)
|
||
frame48 = upsample3(frame16)
|
||
|
||
# --- Apply high-pass filter ---
|
||
frame48 = highpass_filter(frame48, cutoff=100, fs=TARGET_SR)
|
||
|
||
frame48 = np.ascontiguousarray(frame48, dtype=np.float32)
|
||
|
||
# --- Write raw upsampled mono (with HPF) ---
|
||
raw_wav.writeframes(
|
||
np.clip(frame48 * 32767.0, -32768, 32767).astype(np.int16).tobytes()
|
||
)
|
||
|
||
# --- Denoise ---
|
||
out48 = np.zeros(FRAME_SIZE, dtype=np.float32)
|
||
rn.rnnoise_process_frame(
|
||
st,
|
||
out48.ctypes.data_as(POINTER(c_float)),
|
||
frame48.ctypes.data_as(POINTER(c_float))
|
||
)
|
||
|
||
den_wav.writeframes(
|
||
np.clip(out48 * 32767.0, -32768, 32767).astype(np.int16).tobytes()
|
||
)
|
||
except KeyboardInterrupt:
|
||
pass
|
||
finally:
|
||
raw_wav.close()
|
||
den_wav.close()
|
||
rn.rnnoise_destroy(st)
|
||
print(f"Saved {RAW_FILE} and {DENOISED_FILE}")
|