little_sophia_brain/main.py

284 lines
9.7 KiB
Python

from flask import Flask, Response, jsonify
import cv2
import numpy as np
from math import ceil
from itertools import product
from rknnlite.api import RKNNLite
import threading
import sounddevice as sd
import queue
import json
from vosk import Model, KaldiRecognizer
import time
latest_partial = {"partial": ""}
latest_result = {"text": ""}
def speech_loop():
global latest_partial, latest_result
model = Model("./vosk-model-small-en-us-0.15")
rec = KaldiRecognizer(model, 16000)
q = queue.Queue()
def callback(indata, frames, time, status):
if status:
print(status)
q.put(bytes(indata))
with sd.RawInputStream(samplerate=16000, blocksize=8000, dtype='int16',
channels=1, callback=callback):
while True:
data = q.get()
if rec.AcceptWaveform(data):
result = json.loads(rec.Result())
latest_result = result
print(".", result)
else:
partial = json.loads(rec.PartialResult())
latest_partial = partial
print("...", partial.get("partial", ""), end='\r')
# --- RetinaFace Utilities ---
def letterbox_resize(image, size, bg_color):
target_width, target_height = size
image_height, image_width, _ = image.shape
aspect_ratio = min(target_width / image_width, target_height / image_height)
new_width = int(image_width * aspect_ratio)
new_height = int(image_height * aspect_ratio)
image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
result_image = np.ones((target_height, target_width, 3), dtype=np.uint8) * bg_color
offset_x = (target_width - new_width) // 2
offset_y = (target_height - new_height) // 2
result_image[offset_y:offset_y + new_height, offset_x:offset_x + new_width] = image
return result_image, aspect_ratio, offset_x, offset_y
def PriorBox(image_size):
anchors = []
min_sizes = [[16, 32], [64, 128], [256, 512]]
steps = [8, 16, 32]
feature_maps = [[ceil(image_size[0] / step), ceil(image_size[1] / step)] for step in steps]
for k, f in enumerate(feature_maps):
min_sizes_ = min_sizes[k]
for i, j in product(range(f[0]), range(f[1])):
for min_size in min_sizes_:
s_kx = min_size / image_size[1]
s_ky = min_size / image_size[0]
dense_cx = [x * steps[k] / image_size[1] for x in [j + 0.5]]
dense_cy = [y * steps[k] / image_size[0] for y in [i + 0.5]]
for cy, cx in product(dense_cy, dense_cx):
anchors += [cx, cy, s_kx, s_ky]
return np.array(anchors).reshape(-1, 4)
def box_decode(loc, priors):
variances = [0.1, 0.2]
boxes = np.concatenate((
priors[:, :2] + loc[:, :2] * variances[0] * priors[:, 2:],
priors[:, 2:] * np.exp(loc[:, 2:] * variances[1])), axis=1)
boxes[:, :2] -= boxes[:, 2:] / 2
boxes[:, 2:] += boxes[:, :2]
return boxes
def decode_landm(pre, priors):
variances = [0.1, 0.2]
landmarks = np.concatenate((
priors[:, :2] + pre[:, 0:2] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 2:4] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 4:6] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 6:8] * variances[0] * priors[:, 2:],
priors[:, :2] + pre[:, 8:10] * variances[0] * priors[:, 2:]
), axis=1)
return landmarks
def nms(dets, thresh):
x1, y1, x2, y2, scores = dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3], dets[:, 4]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= thresh)[0]
order = order[inds + 1]
return keep
# --- RKNN Initialization ---
rknn = RKNNLite()
rknn.load_rknn('./RetinaFace.rknn')
rknn.init_runtime()
# --- Shared State ---
latest_frame = None
latest_faces = []
cap = cv2.VideoCapture(0)
# --- Background Inference Loop ---
def background_loop():
global latest_frame, latest_faces
model_size = (320, 320)
priors = PriorBox(model_size)
prev_time = time.time()
while True:
ret, frame = cap.read()
if not ret:
continue
curr_time = time.time()
fps = 1.0 / (curr_time - prev_time)
prev_time = curr_time
img_height, img_width, _ = frame.shape
letterbox_img, aspect_ratio, offset_x, offset_y = letterbox_resize(frame, model_size, 114)
infer_img = np.expand_dims(letterbox_img.astype(np.uint8), axis=0)
outputs = rknn.inference(inputs=[infer_img])
if outputs is None:
continue
loc, conf, landms = outputs
boxes = box_decode(loc.squeeze(0), priors)
boxes *= np.array([model_size[1], model_size[0], model_size[1], model_size[0]])
boxes[:, 0::2] = np.clip((boxes[:, 0::2] - offset_x) / aspect_ratio, 0, img_width)
boxes[:, 1::2] = np.clip((boxes[:, 1::2] - offset_y) / aspect_ratio, 0, img_height)
scores = conf.squeeze(0)[:, 1]
landms = decode_landm(landms.squeeze(0), priors)
landms *= np.tile(np.array([model_size[1], model_size[0]]), 5)
landms[:, 0::2] = np.clip((landms[:, 0::2] - offset_x) / aspect_ratio, 0, img_width)
landms[:, 1::2] = np.clip((landms[:, 1::2] - offset_y) / aspect_ratio, 0, img_height)
inds = np.where(scores > 0.2)[0]
boxes, landms, scores = boxes[inds], landms[inds], scores[inds]
order = scores.argsort()[::-1]
boxes, landms, scores = boxes[order], landms[order], scores[order]
dets = np.hstack((boxes, scores[:, np.newaxis])).astype(np.float32)
keep = nms(dets, 0.5)
dets, landms = dets[keep], landms[keep]
face_data = []
frame_center = np.array([img_width / 2, img_height / 2])
for data, landmark in zip(dets, landms):
if data[4] < 0.6:
continue
x1, y1, x2, y2 = map(int, data[:4])
conf = data[4]
box_center = np.array([(x1 + x2) / 2, (y1 + y2) / 2])
offset = box_center - frame_center
face_data.append({
"box": [x1, y1, x2, y2],
"confidence": float(conf),
"offset_from_center": {
"x": float(offset[0]),
"y": float(offset[1])
}
})
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.putText(frame, f'{conf:.4f}', (x1, y1 + 12), cv2.FONT_HERSHEY_DUPLEX, 0.5, (255, 255, 255))
for j in range(5):
lx, ly = map(int, landmark[j*2:j*2+2])
cv2.circle(frame, (lx, ly), 1, (0, 255, 255), 2)
cv2.putText(frame, f'FPS: {fps:.2f}', (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
if len(face_data) > 0:
print(face_data)
ret, buffer = cv2.imencode('.jpg', frame)
if ret:
latest_frame = buffer.tobytes()
latest_faces = face_data
# --- Flask App ---
app = Flask(__name__)
@app.route('/')
def index():
return '''
<h2>Little Sophias Inner Thoughts</h2>
<img src="/video_feed" width="640" />
<h3>Voice-To-Text</h3>
<div><strong>Partial:</strong> <span id="partial"></span></div>
<div><strong>Final:</strong> <span id="final"></span></div>
<h3>Face Detection</h3>
<div><strong>Count:</strong> <span id="face_count"></span></div>
<div><strong>Offsets:</strong>
<ul id="face_offsets"></ul>
</div>
<script>
async function pollSpeech() {
const res = await fetch('/speech');
const data = await res.json();
document.getElementById('partial').innerText = data.partial || '';
document.getElementById('final').innerText = data.text || '';
}
async function pollFaces() {
const res = await fetch('/faces');
const data = await res.json();
document.getElementById('face_count').innerText = data.length;
const list = document.getElementById('face_offsets');
list.innerHTML = '';
data.forEach(face => {
const offset = face.offset_from_center;
const li = document.createElement('li');
li.innerText = `x: ${offset.x.toFixed(1)}, y: ${offset.y.toFixed(1)}`;
list.appendChild(li);
});
}
function loop() {
pollSpeech();
pollFaces();
setTimeout(loop, 500);
}
loop();
</script>
'''
@app.route('/video_feed')
def video_feed():
return Response(stream_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
def stream_frames():
while True:
if latest_frame:
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + latest_frame + b'\r\n')
@app.route('/faces')
def get_faces():
return jsonify(latest_faces)
@app.route('/speech')
def get_speech():
return jsonify({
"partial": latest_partial.get("partial", ""),
"text": latest_result.get("text", "")
})
# --- Start Background Thread ---
threading.Thread(target=background_loop, daemon=True).start()
threading.Thread(target=speech_loop, daemon=True).start()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)