rknn_heat_test/heat_generator.py

533 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Heat Generator - CPU & NPU Stress Test
For testing cooling solutions on Rockchip boards
With real-time temperature graphing
"""
import os
import sys
import time
import glob
import signal
import argparse
import multiprocessing
from multiprocessing import Process, Value
from ctypes import c_bool
from collections import deque
from io import StringIO
# Check for numpy (used for heavier CPU load)
try:
import numpy as np
HAS_NUMPY = True
except ImportError:
HAS_NUMPY = False
# Check for RKNN Lite (NPU)
try:
from rknnlite.api import RKNNLite
HAS_RKNN = True
except ImportError:
HAS_RKNN = False
# Check for plotext (graphing)
try:
import plotext as plt
HAS_PLOTEXT = True
except ImportError:
HAS_PLOTEXT = False
# ANSI escape codes
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
RESET = "\033[0m"
BOLD = "\033[1m"
# Flicker-free display
CURSOR_HOME = "\033[H"
CLEAR_SCREEN = "\033[2J"
CLEAR_TO_END = "\033[J"
HIDE_CURSOR = "\033[?25l"
SHOW_CURSOR = "\033[?25h"
# Alternate screen buffer (like vim/htop use)
ALT_SCREEN_ON = "\033[?1049h"
ALT_SCREEN_OFF = "\033[?1049l"
def get_terminal_size() -> tuple[int, int]:
"""Get terminal size (columns, rows)"""
try:
import shutil
size = shutil.get_terminal_size()
return size.columns, size.lines
except:
return 80, 24
# Graph colors
GRAPH_COLORS = ['red', 'cyan', 'green', 'yellow', 'magenta']
class TemperatureHistory:
"""Track temperature history for graphing"""
def __init__(self, max_samples: int = 120):
self.max_samples = max_samples
self.history: dict[str, deque] = {}
self.time_points: deque = deque(maxlen=max_samples)
self.start_time = time.time()
self.start_temps: dict[str, float] = {}
self.max_temps: dict[str, float] = {}
self.min_temps: dict[str, float] = {}
def update(self, temps: dict[str, float]):
"""Add new temperature readings"""
current_time = time.time() - self.start_time
self.time_points.append(current_time)
for name, temp in temps.items():
if name not in self.history:
self.history[name] = deque(maxlen=self.max_samples)
self.start_temps[name] = temp
self.max_temps[name] = temp
self.min_temps[name] = temp
self.history[name].append(temp)
self.max_temps[name] = max(self.max_temps[name], temp)
self.min_temps[name] = min(self.min_temps[name], temp)
def get_graph(self, height: int = 12) -> str:
"""Generate temperature graph as string"""
if not HAS_PLOTEXT or len(self.time_points) < 2:
return self._get_ascii_graph()
try:
plt.clear_figure()
plt.clear_data()
plt.theme('dark')
plt.title("🌡️ Temperature History")
plt.xlabel("Time (seconds)")
plt.ylabel("Temp (°C)")
time_list = list(self.time_points)
for i, (name, temps) in enumerate(self.history.items()):
temps_list = list(temps)
friendly = name.replace('-thermal', '').replace('_thermal', '').upper()
color = GRAPH_COLORS[i % len(GRAPH_COLORS)]
min_len = min(len(time_list), len(temps_list))
if min_len > 1:
plt.plot(
time_list[-min_len:],
temps_list[-min_len:],
label=friendly,
color=color
)
# Set y-axis range
all_temps = [t for temps in self.history.values() for t in temps]
if all_temps:
min_temp = max(0, min(all_temps) - 5)
max_temp = min(105, max(all_temps) + 5)
plt.ylim(min_temp, max_temp)
plt.plotsize(None, height)
plt.build()
return plt.active().build()
except Exception:
return self._get_ascii_graph()
def _get_ascii_graph(self) -> str:
"""Fallback ASCII sparkline graph"""
lines = []
lines.append("\n📊 Temperature History")
lines.append("-" * 50)
chars = " ▁▂▃▄▅▆▇█"
for name, temps in self.history.items():
temps_list = list(temps)[-50:]
if not temps_list:
continue
friendly = name.replace('-thermal', '').replace('_thermal', '').upper()
min_t, max_t = min(temps_list), max(temps_list)
range_t = max_t - min_t if max_t > min_t else 1
sparkline = ""
for t in temps_list:
idx = int((t - min_t) / range_t * (len(chars) - 1))
sparkline += chars[idx]
lines.append(f"{friendly:8s} [{min_t:5.1f}-{max_t:5.1f}°C] {sparkline}")
return "\n".join(lines)
def cpu_stress_worker(worker_id: int, running: Value):
"""CPU stress worker - performs intensive calculations."""
if HAS_NUMPY:
size = 500
while running.value:
a = np.random.rand(size, size).astype(np.float32)
b = np.random.rand(size, size).astype(np.float32)
c = np.dot(a, b)
np.sin(a)
np.cos(b)
np.exp(c / 1000)
else:
def is_prime(n):
if n < 2:
return False
for i in range(2, int(n ** 0.5) + 1):
if n % i == 0:
return False
return True
n = 2
while running.value:
is_prime(n)
n += 1
if n > 1000000:
n = 2
def check_npu_available() -> bool:
"""Check if NPU is actually available"""
if os.path.exists('/dev/rknpu'):
return True
npu_paths = [
'/sys/devices/platform/fde40000.npu',
'/sys/devices/platform/fdab0000.npu',
]
for path in npu_paths:
if os.path.exists(path):
return True
return False
def gpu_stress_worker(running: Value):
"""GPU stress worker - heavy compute operations"""
size = 1024
while running.value:
a = np.random.rand(size, size).astype(np.float32)
b = np.random.rand(size, size).astype(np.float32)
c = np.dot(a, b)
np.fft.fft2(a)
np.linalg.svd(a[:256, :256], compute_uv=False)
def npu_stress_worker(model_path: str, running: Value):
"""NPU stress worker - runs continuous inference on the NPU."""
if not HAS_RKNN:
return
if not check_npu_available():
gpu_stress_worker(running)
return
# Suppress ALL RKNN output (it writes to both stdout and stderr at C level)
# Save original file descriptors
stdout_fd = sys.stdout.fileno()
stderr_fd = sys.stderr.fileno()
saved_stdout = os.dup(stdout_fd)
saved_stderr = os.dup(stderr_fd)
# Redirect both to /dev/null
devnull_fd = os.open(os.devnull, os.O_WRONLY)
os.dup2(devnull_fd, stdout_fd)
os.dup2(devnull_fd, stderr_fd)
os.close(devnull_fd)
try:
rknn = RKNNLite()
if not model_path or not os.path.exists(model_path):
return
ret = rknn.load_rknn(model_path)
if ret != 0:
return
ret = rknn.init_runtime()
if ret != 0:
rknn.release()
return
input_shapes = [
(1, 320, 320, 3),
(1, 3, 320, 320),
(320, 320, 3),
(1, 224, 224, 3),
(1, 3, 224, 224),
]
input_data = None
for shape in input_shapes:
try:
test_input = np.random.randint(0, 255, shape, dtype=np.uint8)
rknn.inference(inputs=[test_input])
input_data = test_input
break
except:
continue
if input_data is None:
rknn.release()
return
while running.value:
input_data = np.random.randint(0, 255, input_data.shape, dtype=np.uint8)
rknn.inference(inputs=[input_data])
rknn.release()
finally:
# Restore stdout and stderr
os.dup2(saved_stdout, stdout_fd)
os.dup2(saved_stderr, stderr_fd)
os.close(saved_stdout)
os.close(saved_stderr)
def get_temperatures() -> dict[str, float]:
"""Get current temperatures"""
temps = {}
thermal_path = "/sys/class/thermal"
for zone_path in glob.glob(f"{thermal_path}/thermal_zone*"):
try:
with open(f"{zone_path}/type") as f:
zone_type = f.read().strip()
with open(f"{zone_path}/temp") as f:
temp = int(f.read().strip()) / 1000.0
temps[zone_type] = temp
except:
pass
return temps
def get_temp_color(temp: float) -> str:
"""Get color based on temperature"""
if temp >= 80:
return RED
elif temp >= 60:
return YELLOW
return GREEN
def get_status_icon(temp: float) -> str:
"""Get status icon based on temperature"""
if temp >= 80:
return "🔴"
elif temp >= 60:
return "🟡"
return "🟢"
def main():
parser = argparse.ArgumentParser(
description="🔥 Heat Generator - Stress test CPU & NPU with live temperature graph"
)
parser.add_argument('-c', '--cpu-only', action='store_true', help='Only stress CPU')
parser.add_argument('-n', '--npu-only', action='store_true', help='Only stress NPU')
parser.add_argument('-w', '--workers', type=int, default=0, help='Number of CPU workers (default: all cores)')
parser.add_argument('-m', '--model', type=str, default='/home/radxa/little_sophia_brain/RetinaFace.rknn', help='RKNN model path')
parser.add_argument('-t', '--time', type=int, default=0, help='Run for N seconds (0 = until Ctrl+C)')
parser.add_argument('--no-graph', action='store_true', help='Disable temperature graph')
args = parser.parse_args()
num_cpus = multiprocessing.cpu_count()
num_workers = args.workers if args.workers > 0 else num_cpus
npu_available = check_npu_available()
# Shared flag for stopping workers
running = Value(c_bool, True)
processes = []
# Signal handler
def signal_handler(sig, frame):
running.value = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Get terminal size for graph height
term_cols, term_rows = get_terminal_size()
# Calculate available space for graph:
# Header: 4 lines (═══, title, ═══, blank)
# Status: 2 lines (info, blank)
# Table: 5 lines (header, ─── , sensor1, sensor2, ───)
# Footer: 2 lines (blank, Ctrl+C)
# Total fixed: 13 lines
# Graph overhead: ~4 lines (title, y-label area, x-axis, x-label)
# So graph plot area = term_rows - 13 - 4 = term_rows - 17
# Function to get graph height based on current terminal size
def get_current_graph_height():
_, rows = get_terminal_size()
return max(6, rows - 17)
# Initialize display - use alternate screen buffer for clean full-screen display
sys.stdout.write(ALT_SCREEN_ON + HIDE_CURSOR + CLEAR_SCREEN + CURSOR_HOME)
sys.stdout.flush()
# Temperature history for graphing (~30 seconds at 20 updates/sec)
history = TemperatureHistory(max_samples=600)
start_time = time.time()
try:
# Start workers silently
if not args.npu_only:
for i in range(num_workers):
p = Process(target=cpu_stress_worker, args=(i, running))
p.start()
processes.append(p)
if not args.cpu_only and HAS_RKNN:
p = Process(target=npu_stress_worker, args=(args.model, running))
p.start()
processes.append(p)
# Main display loop
end_time = start_time + args.time if args.time > 0 else float('inf')
# Graph caching - only update graph every 0.2s (5fps) for performance
cached_graph = ""
last_graph_time = 0
graph_update_interval = 0.2
while running.value and time.time() < end_time:
temps = get_temperatures()
history.update(temps)
elapsed = time.time() - start_time
# Build entire output in memory first
lines = []
# Header
lines.append(f"{BOLD}{RED}{'' * 60}{RESET}")
lines.append(f"{BOLD}{RED} 🔥 HEAT GENERATOR - STRESS TEST 🔥{RESET}")
lines.append(f"{BOLD}{RED}{'' * 60}{RESET}")
lines.append("")
# System info
status_line = f" {CYAN}CPU Workers:{RESET} {num_workers if not args.npu_only else 0} "
status_line += f"{CYAN}NPU:{RESET} {'✅ Active' if (not args.cpu_only and npu_available) else '❌ Off'} "
status_line += f"{CYAN}Elapsed:{RESET} {elapsed:.1f}s"
if args.time > 0:
remaining = args.time - elapsed
status_line += f" {CYAN}Remaining:{RESET} {remaining:.1f}s"
lines.append(status_line)
lines.append("")
# Current temperatures table
lines.append(f" {'Sensor':<12} {'Current':>10} {'Start':>10} {'Min':>8} {'Max':>8}")
lines.append(f" {'-' * 52}")
for name, temp in temps.items():
friendly = name.replace('-thermal', '').replace('_thermal', '').upper()
icon = get_status_icon(temp)
color = get_temp_color(temp)
start_t = history.start_temps.get(name, temp)
min_t = history.min_temps.get(name, temp)
max_t = history.max_temps.get(name, temp)
lines.append(f" {icon} {friendly:<10} {color}{temp:>7.1f}°C{RESET} {start_t:>7.1f}°C {min_t:>6.1f}° {max_t:>6.1f}°")
lines.append(f" {'-' * 52}")
# Temperature graph - only regenerate every 0.2s for performance
current_time = time.time()
if not args.no_graph and len(history.time_points) > 2:
if current_time - last_graph_time >= graph_update_interval:
current_graph_height = get_current_graph_height()
cached_graph = history.get_graph(height=current_graph_height)
last_graph_time = current_time
graph_str = cached_graph
# Footer
lines.append("")
lines.append(f" Press {BOLD}Ctrl+C{RESET} to stop")
# Build output: header content first
header_content = "\n".join(lines[:len(lines)-2]) # Everything except last 2 lines
footer_content = "\n".join(lines[-2:]) # Last 2 lines
# Calculate total lines
header_lines = header_content.count('\n') + 1
graph_lines = graph_str.count('\n') + 1 if graph_str else 0
footer_lines = footer_content.count('\n') + 1
total_content_lines = header_lines + graph_lines + footer_lines
# Build complete output
if graph_str:
output = header_content + "\n" + graph_str + "\n" + footer_content
else:
output = header_content + "\n" + footer_content
# Pad to exactly fill terminal (add lines at the end)
current_lines = output.count('\n') + 1
if current_lines < term_rows:
output += "\n" * (term_rows - current_lines)
# Single atomic write: home + content + clear remainder
sys.stdout.write(CURSOR_HOME + output + CLEAR_TO_END)
sys.stdout.flush()
time.sleep(0.05) # 20 updates per second
running.value = False
finally:
running.value = False
# Exit alternate screen and restore cursor
sys.stdout.write(ALT_SCREEN_OFF + SHOW_CURSOR)
sys.stdout.flush()
# Wait for workers
for p in processes:
p.join(timeout=2)
if p.is_alive():
p.terminate()
# Final summary on clean screen
elapsed = time.time() - start_time
temps = get_temperatures()
print(f"{GREEN}{'' * 60}{RESET}")
print(f"{GREEN} ✅ Heat generation stopped after {elapsed:.1f}s{RESET}")
print(f"{GREEN}{'' * 60}{RESET}\n")
print(f" {'Sensor':<12} {'Final':>10} {'Start':>10} {'Peak':>10} {'Rise':>10}")
print(f" {'-' * 54}")
for name, temp in temps.items():
friendly = name.replace('-thermal', '').replace('_thermal', '').upper()
color = get_temp_color(temp)
start_t = history.start_temps.get(name, temp)
max_t = history.max_temps.get(name, temp)
rise = max_t - start_t
print(f" {friendly:<12} {color}{temp:>7.1f}°C{RESET} {start_t:>7.1f}°C {max_t:>7.1f}°C {'+' if rise >= 0 else ''}{rise:>6.1f}°C")
print()
if __name__ == "__main__":
main()