from flask import Flask, Response, jsonify import cv2 import numpy as np from math import ceil from itertools import product from rknnlite.api import RKNNLite import threading import sounddevice as sd import queue import json from vosk import Model, KaldiRecognizer import time latest_partial = {"partial": ""} latest_result = {"text": ""} def speech_loop(): global latest_partial, latest_result model = Model("./vosk-model-small-en-us-0.15") rec = KaldiRecognizer(model, 16000) q = queue.Queue() def callback(indata, frames, time, status): if status: print(status) q.put(bytes(indata)) with sd.RawInputStream(samplerate=16000, blocksize=8000, dtype='int16', channels=1, callback=callback): while True: data = q.get() if rec.AcceptWaveform(data): result = json.loads(rec.Result()) latest_result = result print(".", result) else: partial = json.loads(rec.PartialResult()) latest_partial = partial print("...", partial.get("partial", ""), end='\r') # --- RetinaFace Utilities --- def letterbox_resize(image, size, bg_color): target_width, target_height = size image_height, image_width, _ = image.shape aspect_ratio = min(target_width / image_width, target_height / image_height) new_width = int(image_width * aspect_ratio) new_height = int(image_height * aspect_ratio) image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) result_image = np.ones((target_height, target_width, 3), dtype=np.uint8) * bg_color offset_x = (target_width - new_width) // 2 offset_y = (target_height - new_height) // 2 result_image[offset_y:offset_y + new_height, offset_x:offset_x + new_width] = image return result_image, aspect_ratio, offset_x, offset_y def PriorBox(image_size): anchors = [] min_sizes = [[16, 32], [64, 128], [256, 512]] steps = [8, 16, 32] feature_maps = [[ceil(image_size[0] / step), ceil(image_size[1] / step)] for step in steps] for k, f in enumerate(feature_maps): min_sizes_ = min_sizes[k] for i, j in product(range(f[0]), range(f[1])): for min_size in min_sizes_: s_kx = min_size / image_size[1] s_ky = min_size / image_size[0] dense_cx = [x * steps[k] / image_size[1] for x in [j + 0.5]] dense_cy = [y * steps[k] / image_size[0] for y in [i + 0.5]] for cy, cx in product(dense_cy, dense_cx): anchors += [cx, cy, s_kx, s_ky] return np.array(anchors).reshape(-1, 4) def box_decode(loc, priors): variances = [0.1, 0.2] boxes = np.concatenate(( priors[:, :2] + loc[:, :2] * variances[0] * priors[:, 2:], priors[:, 2:] * np.exp(loc[:, 2:] * variances[1])), axis=1) boxes[:, :2] -= boxes[:, 2:] / 2 boxes[:, 2:] += boxes[:, :2] return boxes def decode_landm(pre, priors): variances = [0.1, 0.2] landmarks = np.concatenate(( priors[:, :2] + pre[:, 0:2] * variances[0] * priors[:, 2:], priors[:, :2] + pre[:, 2:4] * variances[0] * priors[:, 2:], priors[:, :2] + pre[:, 4:6] * variances[0] * priors[:, 2:], priors[:, :2] + pre[:, 6:8] * variances[0] * priors[:, 2:], priors[:, :2] + pre[:, 8:10] * variances[0] * priors[:, 2:] ), axis=1) return landmarks def nms(dets, thresh): x1, y1, x2, y2, scores = dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3], dets[:, 4] areas = (x2 - x1 + 1) * (y2 - y1 + 1) order = scores.argsort()[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(x1[i], x1[order[1:]]) yy1 = np.maximum(y1[i], y1[order[1:]]) xx2 = np.minimum(x2[i], x2[order[1:]]) yy2 = np.minimum(y2[i], y2[order[1:]]) w = np.maximum(0.0, xx2 - xx1 + 1) h = np.maximum(0.0, yy2 - yy1 + 1) inter = w * h ovr = inter / (areas[i] + areas[order[1:]] - inter) inds = np.where(ovr <= thresh)[0] order = order[inds + 1] return keep # --- RKNN Initialization --- rknn = RKNNLite() rknn.load_rknn('./RetinaFace.rknn') rknn.init_runtime() # --- Shared State --- latest_frame = None latest_faces = [] cap = cv2.VideoCapture(0) # --- Background Inference Loop --- def background_loop(): global latest_frame, latest_faces model_size = (320, 320) priors = PriorBox(model_size) prev_time = time.time() while True: ret, frame = cap.read() if not ret: continue curr_time = time.time() fps = 1.0 / (curr_time - prev_time) prev_time = curr_time img_height, img_width, _ = frame.shape letterbox_img, aspect_ratio, offset_x, offset_y = letterbox_resize(frame, model_size, 114) infer_img = np.expand_dims(letterbox_img.astype(np.uint8), axis=0) outputs = rknn.inference(inputs=[infer_img]) if outputs is None: continue loc, conf, landms = outputs boxes = box_decode(loc.squeeze(0), priors) boxes *= np.array([model_size[1], model_size[0], model_size[1], model_size[0]]) boxes[:, 0::2] = np.clip((boxes[:, 0::2] - offset_x) / aspect_ratio, 0, img_width) boxes[:, 1::2] = np.clip((boxes[:, 1::2] - offset_y) / aspect_ratio, 0, img_height) scores = conf.squeeze(0)[:, 1] landms = decode_landm(landms.squeeze(0), priors) landms *= np.tile(np.array([model_size[1], model_size[0]]), 5) landms[:, 0::2] = np.clip((landms[:, 0::2] - offset_x) / aspect_ratio, 0, img_width) landms[:, 1::2] = np.clip((landms[:, 1::2] - offset_y) / aspect_ratio, 0, img_height) inds = np.where(scores > 0.2)[0] boxes, landms, scores = boxes[inds], landms[inds], scores[inds] order = scores.argsort()[::-1] boxes, landms, scores = boxes[order], landms[order], scores[order] dets = np.hstack((boxes, scores[:, np.newaxis])).astype(np.float32) keep = nms(dets, 0.5) dets, landms = dets[keep], landms[keep] face_data = [] frame_center = np.array([img_width / 2, img_height / 2]) for data, landmark in zip(dets, landms): if data[4] < 0.6: continue x1, y1, x2, y2 = map(int, data[:4]) conf = data[4] box_center = np.array([(x1 + x2) / 2, (y1 + y2) / 2]) offset = box_center - frame_center face_data.append({ "box": [x1, y1, x2, y2], "confidence": float(conf), "offset_from_center": { "x": float(offset[0]), "y": float(offset[1]) } }) cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2) cv2.putText(frame, f'{conf:.4f}', (x1, y1 + 12), cv2.FONT_HERSHEY_DUPLEX, 0.5, (255, 255, 255)) for j in range(5): lx, ly = map(int, landmark[j*2:j*2+2]) cv2.circle(frame, (lx, ly), 1, (0, 255, 255), 2) cv2.putText(frame, f'FPS: {fps:.2f}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) if len(face_data) > 0: print(face_data) ret, buffer = cv2.imencode('.jpg', frame) if ret: latest_frame = buffer.tobytes() latest_faces = face_data # --- Flask App --- app = Flask(__name__) @app.route('/') def index(): return '''

Little Sophias Inner Thoughts

Voice-To-Text

Partial:
Final:

Face Detection

Count:
Offsets:
''' @app.route('/video_feed') def video_feed(): return Response(stream_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') def stream_frames(): while True: if latest_frame: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + latest_frame + b'\r\n') @app.route('/faces') def get_faces(): return jsonify(latest_faces) @app.route('/speech') def get_speech(): return jsonify({ "partial": latest_partial.get("partial", ""), "text": latest_result.get("text", "") }) # --- Start Background Thread --- threading.Thread(target=background_loop, daemon=True).start() threading.Thread(target=speech_loop, daemon=True).start() if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)