248 lines
8.6 KiB
Python
248 lines
8.6 KiB
Python
from flask import Flask, Response, jsonify
|
|
import cv2
|
|
import numpy as np
|
|
from math import ceil
|
|
from itertools import product
|
|
from rknnlite.api import RKNNLite
|
|
import threading
|
|
import sounddevice as sd
|
|
import queue
|
|
import json
|
|
from vosk import Model, KaldiRecognizer
|
|
import time
|
|
|
|
def speech_loop():
|
|
global latest_speech
|
|
model = Model("./vosk-model-small-en-us-0.15")
|
|
rec = KaldiRecognizer(model, 16000)
|
|
q = queue.Queue()
|
|
|
|
def callback(indata, frames, time, status):
|
|
if status:
|
|
print(status)
|
|
q.put(bytes(indata))
|
|
|
|
with sd.RawInputStream(samplerate=16000, blocksize=8000, dtype='int16',
|
|
channels=1, callback=callback):
|
|
while True:
|
|
data = q.get()
|
|
if rec.AcceptWaveform(data):
|
|
result = json.loads(rec.Result())
|
|
latest_speech = result
|
|
print(".", result)
|
|
else:
|
|
partial = json.loads(rec.PartialResult())
|
|
latest_speech = partial
|
|
print("...", partial, end='\r')
|
|
|
|
|
|
# --- RetinaFace Utilities ---
|
|
def letterbox_resize(image, size, bg_color):
|
|
target_width, target_height = size
|
|
image_height, image_width, _ = image.shape
|
|
aspect_ratio = min(target_width / image_width, target_height / image_height)
|
|
new_width = int(image_width * aspect_ratio)
|
|
new_height = int(image_height * aspect_ratio)
|
|
image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
|
|
result_image = np.ones((target_height, target_width, 3), dtype=np.uint8) * bg_color
|
|
offset_x = (target_width - new_width) // 2
|
|
offset_y = (target_height - new_height) // 2
|
|
result_image[offset_y:offset_y + new_height, offset_x:offset_x + new_width] = image
|
|
return result_image, aspect_ratio, offset_x, offset_y
|
|
|
|
def PriorBox(image_size):
|
|
anchors = []
|
|
min_sizes = [[16, 32], [64, 128], [256, 512]]
|
|
steps = [8, 16, 32]
|
|
feature_maps = [[ceil(image_size[0] / step), ceil(image_size[1] / step)] for step in steps]
|
|
for k, f in enumerate(feature_maps):
|
|
min_sizes_ = min_sizes[k]
|
|
for i, j in product(range(f[0]), range(f[1])):
|
|
for min_size in min_sizes_:
|
|
s_kx = min_size / image_size[1]
|
|
s_ky = min_size / image_size[0]
|
|
dense_cx = [x * steps[k] / image_size[1] for x in [j + 0.5]]
|
|
dense_cy = [y * steps[k] / image_size[0] for y in [i + 0.5]]
|
|
for cy, cx in product(dense_cy, dense_cx):
|
|
anchors += [cx, cy, s_kx, s_ky]
|
|
return np.array(anchors).reshape(-1, 4)
|
|
|
|
def box_decode(loc, priors):
|
|
variances = [0.1, 0.2]
|
|
boxes = np.concatenate((
|
|
priors[:, :2] + loc[:, :2] * variances[0] * priors[:, 2:],
|
|
priors[:, 2:] * np.exp(loc[:, 2:] * variances[1])), axis=1)
|
|
boxes[:, :2] -= boxes[:, 2:] / 2
|
|
boxes[:, 2:] += boxes[:, :2]
|
|
return boxes
|
|
|
|
def decode_landm(pre, priors):
|
|
variances = [0.1, 0.2]
|
|
landmarks = np.concatenate((
|
|
priors[:, :2] + pre[:, 0:2] * variances[0] * priors[:, 2:],
|
|
priors[:, :2] + pre[:, 2:4] * variances[0] * priors[:, 2:],
|
|
priors[:, :2] + pre[:, 4:6] * variances[0] * priors[:, 2:],
|
|
priors[:, :2] + pre[:, 6:8] * variances[0] * priors[:, 2:],
|
|
priors[:, :2] + pre[:, 8:10] * variances[0] * priors[:, 2:]
|
|
), axis=1)
|
|
return landmarks
|
|
|
|
def nms(dets, thresh):
|
|
x1, y1, x2, y2, scores = dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3], dets[:, 4]
|
|
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
|
|
order = scores.argsort()[::-1]
|
|
keep = []
|
|
while order.size > 0:
|
|
i = order[0]
|
|
keep.append(i)
|
|
xx1 = np.maximum(x1[i], x1[order[1:]])
|
|
yy1 = np.maximum(y1[i], y1[order[1:]])
|
|
xx2 = np.minimum(x2[i], x2[order[1:]])
|
|
yy2 = np.minimum(y2[i], y2[order[1:]])
|
|
w = np.maximum(0.0, xx2 - xx1 + 1)
|
|
h = np.maximum(0.0, yy2 - yy1 + 1)
|
|
inter = w * h
|
|
ovr = inter / (areas[i] + areas[order[1:]] - inter)
|
|
inds = np.where(ovr <= thresh)[0]
|
|
order = order[inds + 1]
|
|
return keep
|
|
|
|
# --- RKNN Initialization ---
|
|
rknn = RKNNLite()
|
|
rknn.load_rknn('./RetinaFace.rknn')
|
|
rknn.init_runtime()
|
|
|
|
# --- Shared State ---
|
|
latest_frame = None
|
|
latest_faces = []
|
|
cap = cv2.VideoCapture(0)
|
|
|
|
# --- Background Inference Loop ---
|
|
def background_loop():
|
|
global latest_frame, latest_faces
|
|
model_size = (320, 320)
|
|
priors = PriorBox(model_size)
|
|
prev_time = time.time()
|
|
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
continue
|
|
|
|
curr_time = time.time()
|
|
fps = 1.0 / (curr_time - prev_time)
|
|
prev_time = curr_time
|
|
|
|
img_height, img_width, _ = frame.shape
|
|
letterbox_img, aspect_ratio, offset_x, offset_y = letterbox_resize(frame, model_size, 114)
|
|
infer_img = np.expand_dims(letterbox_img.astype(np.uint8), axis=0)
|
|
|
|
outputs = rknn.inference(inputs=[infer_img])
|
|
if outputs is None:
|
|
continue
|
|
|
|
loc, conf, landms = outputs
|
|
boxes = box_decode(loc.squeeze(0), priors)
|
|
boxes *= np.array([model_size[1], model_size[0], model_size[1], model_size[0]])
|
|
boxes[:, 0::2] = np.clip((boxes[:, 0::2] - offset_x) / aspect_ratio, 0, img_width)
|
|
boxes[:, 1::2] = np.clip((boxes[:, 1::2] - offset_y) / aspect_ratio, 0, img_height)
|
|
|
|
scores = conf.squeeze(0)[:, 1]
|
|
landms = decode_landm(landms.squeeze(0), priors)
|
|
landms *= np.tile(np.array([model_size[1], model_size[0]]), 5)
|
|
landms[:, 0::2] = np.clip((landms[:, 0::2] - offset_x) / aspect_ratio, 0, img_width)
|
|
landms[:, 1::2] = np.clip((landms[:, 1::2] - offset_y) / aspect_ratio, 0, img_height)
|
|
|
|
inds = np.where(scores > 0.2)[0]
|
|
boxes, landms, scores = boxes[inds], landms[inds], scores[inds]
|
|
order = scores.argsort()[::-1]
|
|
boxes, landms, scores = boxes[order], landms[order], scores[order]
|
|
|
|
dets = np.hstack((boxes, scores[:, np.newaxis])).astype(np.float32)
|
|
keep = nms(dets, 0.5)
|
|
dets, landms = dets[keep], landms[keep]
|
|
|
|
face_data = []
|
|
frame_center = np.array([img_width / 2, img_height / 2])
|
|
|
|
for data, landmark in zip(dets, landms):
|
|
if data[4] < 0.6:
|
|
continue
|
|
x1, y1, x2, y2 = map(int, data[:4])
|
|
conf = data[4]
|
|
box_center = np.array([(x1 + x2) / 2, (y1 + y2) / 2])
|
|
offset = box_center - frame_center
|
|
# face_data.append({
|
|
# "box": [x1, y1, x2, y2],
|
|
# "confidence": float(conf),
|
|
# "offset_from_center": {
|
|
# "x": float(offset[0]),
|
|
# "y": float(offset[1])
|
|
# }
|
|
# })
|
|
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
|
|
cv2.putText(frame, f'{conf:.4f}', (x1, y1 + 12), cv2.FONT_HERSHEY_DUPLEX, 0.5, (255, 255, 255))
|
|
for j in range(5):
|
|
lx, ly = map(int, landmark[j*2:j*2+2])
|
|
cv2.circle(frame, (lx, ly), 1, (0, 255, 255), 2)
|
|
|
|
cv2.putText(frame, f'FPS: {fps:.2f}', (10, 30),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
|
|
if len(face_data) > 0:
|
|
print(face_data)
|
|
ret, buffer = cv2.imencode('.jpg', frame)
|
|
if ret:
|
|
latest_frame = buffer.tobytes()
|
|
latest_faces = face_data
|
|
|
|
# --- Flask App ---
|
|
app = Flask(__name__)
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return '''
|
|
<html>
|
|
<head><title>RetinaFace + Speech</title></head>
|
|
<body>
|
|
<h2>Live Stream</h2>
|
|
<img src="/" width="640" />
|
|
<h3>Live Speech</h3>
|
|
<div id="speech" style="font-size:1.2em; font-family:monospace;"></div>
|
|
<script>
|
|
async function pollSpeech() {
|
|
const res = await fetch('/speech');
|
|
const data = await res.json();
|
|
const text = data.partial || data.text || '';
|
|
document.getElementById('speech').innerText = text;
|
|
setTimeout(pollSpeech, 300);
|
|
}
|
|
pollSpeech();
|
|
</script>
|
|
</body>
|
|
</html>
|
|
'''
|
|
|
|
|
|
|
|
def stream_frames():
|
|
while True:
|
|
if latest_frame:
|
|
yield (b'--frame\r\n'
|
|
b'Content-Type: image/jpeg\r\n\r\n' + latest_frame + b'\r\n')
|
|
|
|
@app.route('/faces')
|
|
def get_faces():
|
|
return jsonify(latest_faces)
|
|
|
|
@app.route('/speech')
|
|
def get_speech():
|
|
return jsonify(latest_speech)
|
|
|
|
|
|
# --- Start Background Thread ---
|
|
threading.Thread(target=background_loop, daemon=True).start()
|
|
threading.Thread(target=speech_loop, daemon=True).start()
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=5000)
|