commit 38ccbba40e03fb36cf7300b77263059ea9fbcabb Author: Jake Wilkinson Date: Fri Oct 24 10:11:17 2025 +0000 face detect and video streaming diff --git a/RetinaFace.rknn b/RetinaFace.rknn new file mode 100644 index 0000000..100a4f7 Binary files /dev/null and b/RetinaFace.rknn differ diff --git a/RetinaFaceExample.py b/RetinaFaceExample.py new file mode 100644 index 0000000..fc60fb4 --- /dev/null +++ b/RetinaFaceExample.py @@ -0,0 +1,190 @@ +from flask import Flask, Response, jsonify +import cv2 +import numpy as np +from math import ceil +from itertools import product +from rknnlite.api import RKNNLite +import threading +import time + +# --- RetinaFace Utilities --- +def letterbox_resize(image, size, bg_color): + target_width, target_height = size + image_height, image_width, _ = image.shape + aspect_ratio = min(target_width / image_width, target_height / image_height) + new_width = int(image_width * aspect_ratio) + new_height = int(image_height * aspect_ratio) + image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) + result_image = np.ones((target_height, target_width, 3), dtype=np.uint8) * bg_color + offset_x = (target_width - new_width) // 2 + offset_y = (target_height - new_height) // 2 + result_image[offset_y:offset_y + new_height, offset_x:offset_x + new_width] = image + return result_image, aspect_ratio, offset_x, offset_y + +def PriorBox(image_size): + anchors = [] + min_sizes = [[16, 32], [64, 128], [256, 512]] + steps = [8, 16, 32] + feature_maps = [[ceil(image_size[0] / step), ceil(image_size[1] / step)] for step in steps] + for k, f in enumerate(feature_maps): + min_sizes_ = min_sizes[k] + for i, j in product(range(f[0]), range(f[1])): + for min_size in min_sizes_: + s_kx = min_size / image_size[1] + s_ky = min_size / image_size[0] + dense_cx = [x * steps[k] / image_size[1] for x in [j + 0.5]] + dense_cy = [y * steps[k] / image_size[0] for y in [i + 0.5]] + for cy, cx in product(dense_cy, dense_cx): + anchors += [cx, cy, s_kx, s_ky] + return np.array(anchors).reshape(-1, 4) + +def box_decode(loc, priors): + variances = [0.1, 0.2] + boxes = np.concatenate(( + priors[:, :2] + loc[:, :2] * variances[0] * priors[:, 2:], + priors[:, 2:] * np.exp(loc[:, 2:] * variances[1])), axis=1) + boxes[:, :2] -= boxes[:, 2:] / 2 + boxes[:, 2:] += boxes[:, :2] + return boxes + +def decode_landm(pre, priors): + variances = [0.1, 0.2] + landmarks = np.concatenate(( + priors[:, :2] + pre[:, 0:2] * variances[0] * priors[:, 2:], + priors[:, :2] + pre[:, 2:4] * variances[0] * priors[:, 2:], + priors[:, :2] + pre[:, 4:6] * variances[0] * priors[:, 2:], + priors[:, :2] + pre[:, 6:8] * variances[0] * priors[:, 2:], + priors[:, :2] + pre[:, 8:10] * variances[0] * priors[:, 2:] + ), axis=1) + return landmarks + +def nms(dets, thresh): + x1, y1, x2, y2, scores = dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3], dets[:, 4] + areas = (x2 - x1 + 1) * (y2 - y1 + 1) + order = scores.argsort()[::-1] + keep = [] + while order.size > 0: + i = order[0] + keep.append(i) + xx1 = np.maximum(x1[i], x1[order[1:]]) + yy1 = np.maximum(y1[i], y1[order[1:]]) + xx2 = np.minimum(x2[i], x2[order[1:]]) + yy2 = np.minimum(y2[i], y2[order[1:]]) + w = np.maximum(0.0, xx2 - xx1 + 1) + h = np.maximum(0.0, yy2 - yy1 + 1) + inter = w * h + ovr = inter / (areas[i] + areas[order[1:]] - inter) + inds = np.where(ovr <= thresh)[0] + order = order[inds + 1] + return keep + +# --- RKNN Initialization --- +rknn = RKNNLite() +rknn.load_rknn('./RetinaFace.rknn') +rknn.init_runtime() + +# --- Shared State --- +latest_frame = None +latest_faces = [] +cap = cv2.VideoCapture(0) + +# --- Background Inference Loop --- +def background_loop(): + global latest_frame, latest_faces + model_size = (320, 320) + priors = PriorBox(model_size) + prev_time = time.time() + + while True: + ret, frame = cap.read() + if not ret: + continue + + curr_time = time.time() + fps = 1.0 / (curr_time - prev_time) + prev_time = curr_time + + img_height, img_width, _ = frame.shape + letterbox_img, aspect_ratio, offset_x, offset_y = letterbox_resize(frame, model_size, 114) + infer_img = np.expand_dims(letterbox_img.astype(np.uint8), axis=0) + + outputs = rknn.inference(inputs=[infer_img]) + if outputs is None: + continue + + loc, conf, landms = outputs + boxes = box_decode(loc.squeeze(0), priors) + boxes *= np.array([model_size[1], model_size[0], model_size[1], model_size[0]]) + boxes[:, 0::2] = np.clip((boxes[:, 0::2] - offset_x) / aspect_ratio, 0, img_width) + boxes[:, 1::2] = np.clip((boxes[:, 1::2] - offset_y) / aspect_ratio, 0, img_height) + + scores = conf.squeeze(0)[:, 1] + landms = decode_landm(landms.squeeze(0), priors) + landms *= np.tile(np.array([model_size[1], model_size[0]]), 5) + landms[:, 0::2] = np.clip((landms[:, 0::2] - offset_x) / aspect_ratio, 0, img_width) + landms[:, 1::2] = np.clip((landms[:, 1::2] - offset_y) / aspect_ratio, 0, img_height) + + inds = np.where(scores > 0.2)[0] + boxes, landms, scores = boxes[inds], landms[inds], scores[inds] + order = scores.argsort()[::-1] + boxes, landms, scores = boxes[order], landms[order], scores[order] + + dets = np.hstack((boxes, scores[:, np.newaxis])).astype(np.float32) + keep = nms(dets, 0.5) + dets, landms = dets[keep], landms[keep] + + face_data = [] + frame_center = np.array([img_width / 2, img_height / 2]) + + for data, landmark in zip(dets, landms): + if data[4] < 0.6: + continue + x1, y1, x2, y2 = map(int, data[:4]) + conf = data[4] + box_center = np.array([(x1 + x2) / 2, (y1 + y2) / 2]) + offset = box_center - frame_center + face_data.append({ + "box": [x1, y1, x2, y2], + "confidence": float(conf), + "offset_from_center": { + "x": float(offset[0]), + "y": float(offset[1]) + } + }) + cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2) + cv2.putText(frame, f'{conf:.4f}', (x1, y1 + 12), cv2.FONT_HERSHEY_DUPLEX, 0.5, (255, 255, 255)) + for j in range(5): + lx, ly = map(int, landmark[j*2:j*2+2]) + cv2.circle(frame, (lx, ly), 1, (0, 255, 255), 2) + + cv2.putText(frame, f'FPS: {fps:.2f}', (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) + if len(face_data) > 0: + print(face_data) + ret, buffer = cv2.imencode('.jpg', frame) + if ret: + latest_frame = buffer.tobytes() + latest_faces = face_data + +# --- Flask App --- +app = Flask(__name__) + +@app.route('/') +def index(): + return Response(stream_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') + +def stream_frames(): + while True: + if latest_frame: + yield (b'--frame\r\n' + b'Content-Type: image/jpeg\r\n\r\n' + latest_frame + b'\r\n') + +@app.route('/faces') +def get_faces(): + return jsonify(latest_faces) + +# --- Start Background Thread --- +threading.Thread(target=background_loop, daemon=True).start() + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=5000)