face detect and video streaming

master
Jake Wilkinson 2025-10-24 10:11:17 +00:00
commit 38ccbba40e
2 changed files with 190 additions and 0 deletions

BIN
RetinaFace.rknn Normal file

Binary file not shown.

190
RetinaFaceExample.py Normal file
View File

@ -0,0 +1,190 @@
from flask import Flask, Response, jsonify
import cv2
import numpy as np
from math import ceil
from itertools import product
from rknnlite.api import RKNNLite
import threading
import time
# --- RetinaFace Utilities ---
def letterbox_resize(image, size, bg_color):
target_width, target_height = size
image_height, image_width, _ = image.shape
aspect_ratio = min(target_width / image_width, target_height / image_height)
new_width = int(image_width * aspect_ratio)
new_height = int(image_height * aspect_ratio)
image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
result_image = np.ones((target_height, target_width, 3), dtype=np.uint8) * bg_color
offset_x = (target_width - new_width) // 2
offset_y = (target_height - new_height) // 2
result_image[offset_y:offset_y + new_height, offset_x:offset_x + new_width] = image
return result_image, aspect_ratio, offset_x, offset_y
def PriorBox(image_size):
anchors = []
min_sizes = [[16, 32], [64, 128], [256, 512]]
steps = [8, 16, 32]
feature_maps = [[ceil(image_size[0] / step), ceil(image_size[1] / step)] for step in steps]
for k, f in enumerate(feature_maps):
min_sizes_ = min_sizes[k]
for i, j in product(range(f[0]), range(f[1])):
for min_size in min_sizes_:
s_kx = min_size / image_size[1]
s_ky = min_size / image_size[0]
dense_cx = [x * steps[k] / image_size[1] for x in [j + 0.5]]
dense_cy = [y * steps[k] / image_size[0] for y in [i + 0.5]]
for cy, cx in product(dense_cy, dense_cx):
anchors += [cx, cy, s_kx, s_ky]
return np.array(anchors).reshape(-1, 4)
def box_decode(loc, priors):
variances = [0.1, 0.2]
boxes = np.concatenate((
priors[:, :2] + loc[:, :2] * variances[0] * priors[:, 2:],
priors[:, 2:] * np.exp(loc[:, 2:] * variances[1])), axis=1)
boxes[:, :2] -= boxes[:, 2:] / 2
boxes[:, 2:] += boxes[:, :2]
return boxes
def decode_landm(pre, priors):
variances = [0.1, 0.2]
landmarks = np.concatenate((
priors[:, :2] + pre[:, 0:2] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 2:4] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 4:6] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 6:8] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 8:10] * variances[0] * priors[:, 2:]
), axis=1)
return landmarks
def nms(dets, thresh):
x1, y1, x2, y2, scores = dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3], dets[:, 4]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= thresh)[0]
order = order[inds + 1]
return keep
# --- RKNN Initialization ---
rknn = RKNNLite()
rknn.load_rknn('./RetinaFace.rknn')
rknn.init_runtime()
# --- Shared State ---
latest_frame = None
latest_faces = []
cap = cv2.VideoCapture(0)
# --- Background Inference Loop ---
def background_loop():
global latest_frame, latest_faces
model_size = (320, 320)
priors = PriorBox(model_size)
prev_time = time.time()
while True:
ret, frame = cap.read()
if not ret:
continue
curr_time = time.time()
fps = 1.0 / (curr_time - prev_time)
prev_time = curr_time
img_height, img_width, _ = frame.shape
letterbox_img, aspect_ratio, offset_x, offset_y = letterbox_resize(frame, model_size, 114)
infer_img = np.expand_dims(letterbox_img.astype(np.uint8), axis=0)
outputs = rknn.inference(inputs=[infer_img])
if outputs is None:
continue
loc, conf, landms = outputs
boxes = box_decode(loc.squeeze(0), priors)
boxes *= np.array([model_size[1], model_size[0], model_size[1], model_size[0]])
boxes[:, 0::2] = np.clip((boxes[:, 0::2] - offset_x) / aspect_ratio, 0, img_width)
boxes[:, 1::2] = np.clip((boxes[:, 1::2] - offset_y) / aspect_ratio, 0, img_height)
scores = conf.squeeze(0)[:, 1]
landms = decode_landm(landms.squeeze(0), priors)
landms *= np.tile(np.array([model_size[1], model_size[0]]), 5)
landms[:, 0::2] = np.clip((landms[:, 0::2] - offset_x) / aspect_ratio, 0, img_width)
landms[:, 1::2] = np.clip((landms[:, 1::2] - offset_y) / aspect_ratio, 0, img_height)
inds = np.where(scores > 0.2)[0]
boxes, landms, scores = boxes[inds], landms[inds], scores[inds]
order = scores.argsort()[::-1]
boxes, landms, scores = boxes[order], landms[order], scores[order]
dets = np.hstack((boxes, scores[:, np.newaxis])).astype(np.float32)
keep = nms(dets, 0.5)
dets, landms = dets[keep], landms[keep]
face_data = []
frame_center = np.array([img_width / 2, img_height / 2])
for data, landmark in zip(dets, landms):
if data[4] < 0.6:
continue
x1, y1, x2, y2 = map(int, data[:4])
conf = data[4]
box_center = np.array([(x1 + x2) / 2, (y1 + y2) / 2])
offset = box_center - frame_center
face_data.append({
"box": [x1, y1, x2, y2],
"confidence": float(conf),
"offset_from_center": {
"x": float(offset[0]),
"y": float(offset[1])
}
})
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.putText(frame, f'{conf:.4f}', (x1, y1 + 12), cv2.FONT_HERSHEY_DUPLEX, 0.5, (255, 255, 255))
for j in range(5):
lx, ly = map(int, landmark[j*2:j*2+2])
cv2.circle(frame, (lx, ly), 1, (0, 255, 255), 2)
cv2.putText(frame, f'FPS: {fps:.2f}', (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
if len(face_data) > 0:
print(face_data)
ret, buffer = cv2.imencode('.jpg', frame)
if ret:
latest_frame = buffer.tobytes()
latest_faces = face_data
# --- Flask App ---
app = Flask(__name__)
@app.route('/')
def index():
return Response(stream_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
def stream_frames():
while True:
if latest_frame:
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + latest_frame + b'\r\n')
@app.route('/faces')
def get_faces():
return jsonify(latest_faces)
# --- Start Background Thread ---
threading.Thread(target=background_loop, daemon=True).start()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)