camera_module has full featured face detect

master
Jake Wilkinson 2025-11-09 20:21:53 +08:00
parent fbbddb4bd0
commit 195cc5fc2e
3 changed files with 71 additions and 3 deletions

13
build_cam.sh Executable file
View File

@ -0,0 +1,13 @@
#!/bin/bash
cd ~/ros2_ws
source /opt/ros/kilted/setup.bash # Use system ROS setup if needed
rm -rf build/camera_module install/camera_module log
# Build only camera_module
colcon build --packages-select camera_module --event-handlers console_direct+
# Source overlay
source install/setup.bash
camera_module camera_module

View File

@ -34,6 +34,13 @@
background: #d0f0c0; background: #d0f0c0;
transition: background 0.5s ease; transition: background 0.5s ease;
} }
#cameraImage {
display: block;
margin-top: 1em;
border: 1px solid #ccc;
}
</style> </style>
</head> </head>
@ -55,7 +62,8 @@
</div> </div>
<p>Current publish rate: <span id="currentRate">1.0</span> Hz</p> <p>Current publish rate: <span id="currentRate">1.0</span> Hz</p>
<h2>Live Camera Feed</h2> <h2>Live Camera Feed</h2>
<img id="cameraImage" width="640" height="480" alt="Camera Feed"> <img id="cameraImage" style="max-width: 100%; height: auto;" alt="Camera Feed">
<script> <script>
const ros = new ROSLIB.Ros({ url: 'ws://192.168.1.205:9090' }); const ros = new ROSLIB.Ros({ url: 'ws://192.168.1.205:9090' });

View File

@ -11,6 +11,7 @@ from cv_bridge import CvBridge
import time import time
from camera_module.threaded_node import ThreadedNode from camera_module.threaded_node import ThreadedNode
from sensor_msgs.msg import CompressedImage
from std_msgs.msg import String from std_msgs.msg import String
import rclpy import rclpy
@ -25,9 +26,11 @@ model_path = os.path.join(get_package_share_directory(package_name), 'resource',
class CamPublisher(ThreadedNode): class CamPublisher(ThreadedNode):
def __init__(self, rknn, cap): def __init__(self, rknn, cap):
super().__init__('camera_module', default_rate=5.0) super().__init__('camera_module', default_rate=5.0)
self.string_pub = self.create_publisher(String, 'camera_module/cam_topic', 10) #self.string_pub = self.create_publisher(String, 'camera_module/cam_topic', 10)
self.image_pub = self.create_publisher(CompressedImage, 'camera_module/compressed', 10) #self.image_pub = self.create_publisher(CompressedImage, 'camera_module/compressed', 10)
self.face_detect_pub = self.create_publisher(String, 'camera_module/face_data', 10) self.face_detect_pub = self.create_publisher(String, 'camera_module/face_data', 10)
self.face_image_pub = self.create_publisher(CompressedImage, 'camera_module/face_images', 10)
self.face_detect_frame_pub = self.create_publisher(CompressedImage, 'camera_module/face_detect_frame', 10)
self.rknn = rknn self.rknn = rknn
self.cap = cap self.cap = cap
self.latest_frame = None self.latest_frame = None
@ -63,6 +66,8 @@ class CamPublisher(ThreadedNode):
ret, frame = cap.read() ret, frame = cap.read()
if not ret: if not ret:
return return
raw_frame = frame.copy()
img_height, img_width, _ = frame.shape img_height, img_width, _ = frame.shape
letterbox_img, aspect_ratio, offset_x, offset_y = letterbox_resize(frame, self.model_size, 114) letterbox_img, aspect_ratio, offset_x, offset_y = letterbox_resize(frame, self.model_size, 114)
infer_img = np.expand_dims(letterbox_img.astype(np.uint8), axis=0) infer_img = np.expand_dims(letterbox_img.astype(np.uint8), axis=0)
@ -95,6 +100,10 @@ class CamPublisher(ThreadedNode):
face_data = [] face_data = []
frame_center = np.array([img_width / 2, img_height / 2]) frame_center = np.array([img_width / 2, img_height / 2])
face_data = []
valid_dets = []
valid_landms = []
for data, landmark in zip(dets, landms): for data, landmark in zip(dets, landms):
if data[4] < 0.6: if data[4] < 0.6:
continue continue
@ -110,6 +119,13 @@ class CamPublisher(ThreadedNode):
"y": float(offset[1]) "y": float(offset[1])
} }
}) })
valid_dets.append(data)
valid_landms.append(landmark)
for data, landmark in zip(valid_dets, valid_landms):
x1, y1, x2, y2 = map(int, data[:4])
conf = data[4]
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2) cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
cv2.putText(frame, f'{conf:.4f}', (x1, y1 + 12), cv2.FONT_HERSHEY_DUPLEX, 0.5, (255, 255, 255)) cv2.putText(frame, f'{conf:.4f}', (x1, y1 + 12), cv2.FONT_HERSHEY_DUPLEX, 0.5, (255, 255, 255))
for j in range(5): for j in range(5):
@ -121,10 +137,41 @@ class CamPublisher(ThreadedNode):
if len(face_data) > 0: if len(face_data) > 0:
print(face_data) print(face_data)
self.face_detect_pub.publish(String(data=str(face_data))) self.face_detect_pub.publish(String(data=str(face_data)))
# CROP FACES AND PUBLISH STITCHED IMAGE
face_crops = []
for data in dets:
if data[4] < 0.6:
continue
x1, y1, x2, y2 = map(int, data[:4])
face_crop = raw_frame[y1:y2, x1:x2]
face_crops.append(face_crop)
target_height = 100
resized_faces = [
cv2.resize(face, (int(face.shape[1] * target_height / face.shape[0]), target_height))
for face in face_crops
]
if resized_faces:
stitched = cv2.hconcat(resized_faces)
ret, stitched_buffer = cv2.imencode('.jpg', stitched)
if ret:
msg = CompressedImage()
msg.header.stamp = self.get_clock().now().to_msg()
msg.format = 'jpeg'
msg.data = stitched_buffer.tobytes()
self.face_image_pub.publish(msg)
ret, buffer = cv2.imencode('.jpg', frame) ret, buffer = cv2.imencode('.jpg', frame)
if ret: if ret:
latest_frame = buffer.tobytes() latest_frame = buffer.tobytes()
latest_faces = face_data latest_faces = face_data
msg = CompressedImage()
msg.header.stamp = self.get_clock().now().to_msg()
msg.format = 'jpeg'
msg.data = buffer.tobytes()
self.face_detect_frame_pub.publish(msg)
def destroy_node(self): def destroy_node(self):
if self.cap: if self.cap: