little_sophia_brain/main.py

191 lines
7.0 KiB
Python

from flask import Flask, Response, jsonify
import cv2
import numpy as np
from math import ceil
from itertools import product
from rknnlite.api import RKNNLite
import threading
import time
# --- RetinaFace Utilities ---
def letterbox_resize(image, size, bg_color):
target_width, target_height = size
image_height, image_width, _ = image.shape
aspect_ratio = min(target_width / image_width, target_height / image_height)
new_width = int(image_width * aspect_ratio)
new_height = int(image_height * aspect_ratio)
image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
result_image = np.ones((target_height, target_width, 3), dtype=np.uint8) * bg_color
offset_x = (target_width - new_width) // 2
offset_y = (target_height - new_height) // 2
result_image[offset_y:offset_y + new_height, offset_x:offset_x + new_width] = image
return result_image, aspect_ratio, offset_x, offset_y
def PriorBox(image_size):
anchors = []
min_sizes = [[16, 32], [64, 128], [256, 512]]
steps = [8, 16, 32]
feature_maps = [[ceil(image_size[0] / step), ceil(image_size[1] / step)] for step in steps]
for k, f in enumerate(feature_maps):
min_sizes_ = min_sizes[k]
for i, j in product(range(f[0]), range(f[1])):
for min_size in min_sizes_:
s_kx = min_size / image_size[1]
s_ky = min_size / image_size[0]
dense_cx = [x * steps[k] / image_size[1] for x in [j + 0.5]]
dense_cy = [y * steps[k] / image_size[0] for y in [i + 0.5]]
for cy, cx in product(dense_cy, dense_cx):
anchors += [cx, cy, s_kx, s_ky]
return np.array(anchors).reshape(-1, 4)
def box_decode(loc, priors):
variances = [0.1, 0.2]
boxes = np.concatenate((
priors[:, :2] + loc[:, :2] * variances[0] * priors[:, 2:],
priors[:, 2:] * np.exp(loc[:, 2:] * variances[1])), axis=1)
boxes[:, :2] -= boxes[:, 2:] / 2
boxes[:, 2:] += boxes[:, :2]
return boxes
def decode_landm(pre, priors):
variances = [0.1, 0.2]
landmarks = np.concatenate((
priors[:, :2] + pre[:, 0:2] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 2:4] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 4:6] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 6:8] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 8:10] * variances[0] * priors[:, 2:]
), axis=1)
return landmarks
def nms(dets, thresh):
x1, y1, x2, y2, scores = dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3], dets[:, 4]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= thresh)[0]
order = order[inds + 1]
return keep
# --- RKNN Initialization ---
rknn = RKNNLite()
rknn.load_rknn('./RetinaFace.rknn')
rknn.init_runtime()
# --- Shared State ---
latest_frame = None
latest_faces = []
cap = cv2.VideoCapture(0)
# --- Background Inference Loop ---
def background_loop():
global latest_frame, latest_faces
model_size = (320, 320)
priors = PriorBox(model_size)
prev_time = time.time()
while True:
ret, frame = cap.read()
if not ret:
continue
curr_time = time.time()
fps = 1.0 / (curr_time - prev_time)
prev_time = curr_time
img_height, img_width, _ = frame.shape
letterbox_img, aspect_ratio, offset_x, offset_y = letterbox_resize(frame, model_size, 114)
infer_img = np.expand_dims(letterbox_img.astype(np.uint8), axis=0)
outputs = rknn.inference(inputs=[infer_img])
if outputs is None:
continue
loc, conf, landms = outputs
boxes = box_decode(loc.squeeze(0), priors)
boxes *= np.array([model_size[1], model_size[0], model_size[1], model_size[0]])
boxes[:, 0::2] = np.clip((boxes[:, 0::2] - offset_x) / aspect_ratio, 0, img_width)
boxes[:, 1::2] = np.clip((boxes[:, 1::2] - offset_y) / aspect_ratio, 0, img_height)
scores = conf.squeeze(0)[:, 1]
landms = decode_landm(landms.squeeze(0), priors)
landms *= np.tile(np.array([model_size[1], model_size[0]]), 5)
landms[:, 0::2] = np.clip((landms[:, 0::2] - offset_x) / aspect_ratio, 0, img_width)
landms[:, 1::2] = np.clip((landms[:, 1::2] - offset_y) / aspect_ratio, 0, img_height)
inds = np.where(scores > 0.2)[0]
boxes, landms, scores = boxes[inds], landms[inds], scores[inds]
order = scores.argsort()[::-1]
boxes, landms, scores = boxes[order], landms[order], scores[order]
dets = np.hstack((boxes, scores[:, np.newaxis])).astype(np.float32)
keep = nms(dets, 0.5)
dets, landms = dets[keep], landms[keep]
face_data = []
frame_center = np.array([img_width / 2, img_height / 2])
for data, landmark in zip(dets, landms):
if data[4] < 0.6:
continue
x1, y1, x2, y2 = map(int, data[:4])
conf = data[4]
box_center = np.array([(x1 + x2) / 2, (y1 + y2) / 2])
offset = box_center - frame_center
face_data.append({
"box": [x1, y1, x2, y2],
"confidence": float(conf),
"offset_from_center": {
"x": float(offset[0]),
"y": float(offset[1])
}
})
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.putText(frame, f'{conf:.4f}', (x1, y1 + 12), cv2.FONT_HERSHEY_DUPLEX, 0.5, (255, 255, 255))
for j in range(5):
lx, ly = map(int, landmark[j*2:j*2+2])
cv2.circle(frame, (lx, ly), 1, (0, 255, 255), 2)
cv2.putText(frame, f'FPS: {fps:.2f}', (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
if len(face_data) > 0:
print(face_data)
ret, buffer = cv2.imencode('.jpg', frame)
if ret:
latest_frame = buffer.tobytes()
latest_faces = face_data
# --- Flask App ---
app = Flask(__name__)
@app.route('/')
def index():
return Response(stream_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
def stream_frames():
while True:
if latest_frame:
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + latest_frame + b'\r\n')
@app.route('/faces')
def get_faces():
return jsonify(latest_faces)
# --- Start Background Thread ---
threading.Thread(target=background_loop, daemon=True).start()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)