#!/usr/bin/env python3 """ Heat Generator - CPU & NPU Stress Test For testing cooling solutions on Rockchip boards With real-time temperature graphing """ import os import sys import time import glob import signal import argparse import multiprocessing from multiprocessing import Process, Value from ctypes import c_bool from collections import deque from io import StringIO # Check for numpy (used for heavier CPU load) try: import numpy as np HAS_NUMPY = True except ImportError: HAS_NUMPY = False # Check for RKNN Lite (NPU) try: from rknnlite.api import RKNNLite HAS_RKNN = True except ImportError: HAS_RKNN = False # Check for plotext (graphing) try: import plotext as plt HAS_PLOTEXT = True except ImportError: HAS_PLOTEXT = False # ANSI escape codes RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" CYAN = "\033[96m" RESET = "\033[0m" BOLD = "\033[1m" # Flicker-free display CURSOR_HOME = "\033[H" CLEAR_SCREEN = "\033[2J" CLEAR_TO_END = "\033[J" HIDE_CURSOR = "\033[?25l" SHOW_CURSOR = "\033[?25h" # Alternate screen buffer (like vim/htop use) ALT_SCREEN_ON = "\033[?1049h" ALT_SCREEN_OFF = "\033[?1049l" def get_terminal_size() -> tuple[int, int]: """Get terminal size (columns, rows)""" try: import shutil size = shutil.get_terminal_size() return size.columns, size.lines except: return 80, 24 # Graph colors GRAPH_COLORS = ['red', 'cyan', 'green', 'yellow', 'magenta'] class TemperatureHistory: """Track temperature history for graphing""" def __init__(self, max_samples: int = 120): self.max_samples = max_samples self.history: dict[str, deque] = {} self.time_points: deque = deque(maxlen=max_samples) self.start_time = time.time() self.start_temps: dict[str, float] = {} self.max_temps: dict[str, float] = {} self.min_temps: dict[str, float] = {} def update(self, temps: dict[str, float]): """Add new temperature readings""" current_time = time.time() - self.start_time self.time_points.append(current_time) for name, temp in temps.items(): if name not in self.history: self.history[name] = deque(maxlen=self.max_samples) self.start_temps[name] = temp self.max_temps[name] = temp self.min_temps[name] = temp self.history[name].append(temp) self.max_temps[name] = max(self.max_temps[name], temp) self.min_temps[name] = min(self.min_temps[name], temp) def get_graph(self, height: int = 12) -> str: """Generate temperature graph as string""" if not HAS_PLOTEXT or len(self.time_points) < 2: return self._get_ascii_graph() try: plt.clear_figure() plt.clear_data() plt.theme('dark') plt.title("šŸŒ”ļø Temperature History") plt.xlabel("Time (seconds)") plt.ylabel("Temp (°C)") time_list = list(self.time_points) for i, (name, temps) in enumerate(self.history.items()): temps_list = list(temps) friendly = name.replace('-thermal', '').replace('_thermal', '').upper() color = GRAPH_COLORS[i % len(GRAPH_COLORS)] min_len = min(len(time_list), len(temps_list)) if min_len > 1: plt.plot( time_list[-min_len:], temps_list[-min_len:], label=friendly, color=color ) # Set y-axis range all_temps = [t for temps in self.history.values() for t in temps] if all_temps: min_temp = max(0, min(all_temps) - 5) max_temp = min(105, max(all_temps) + 5) plt.ylim(min_temp, max_temp) plt.plotsize(None, height) plt.build() return plt.active().build() except Exception: return self._get_ascii_graph() def _get_ascii_graph(self) -> str: """Fallback ASCII sparkline graph""" lines = [] lines.append("\nšŸ“Š Temperature History") lines.append("-" * 50) chars = " ā–ā–‚ā–ƒā–„ā–…ā–†ā–‡ā–ˆ" for name, temps in self.history.items(): temps_list = list(temps)[-50:] if not temps_list: continue friendly = name.replace('-thermal', '').replace('_thermal', '').upper() min_t, max_t = min(temps_list), max(temps_list) range_t = max_t - min_t if max_t > min_t else 1 sparkline = "" for t in temps_list: idx = int((t - min_t) / range_t * (len(chars) - 1)) sparkline += chars[idx] lines.append(f"{friendly:8s} [{min_t:5.1f}-{max_t:5.1f}°C] {sparkline}") return "\n".join(lines) def cpu_stress_worker(worker_id: int, running: Value): """CPU stress worker - performs intensive calculations.""" if HAS_NUMPY: size = 500 while running.value: a = np.random.rand(size, size).astype(np.float32) b = np.random.rand(size, size).astype(np.float32) c = np.dot(a, b) np.sin(a) np.cos(b) np.exp(c / 1000) else: def is_prime(n): if n < 2: return False for i in range(2, int(n ** 0.5) + 1): if n % i == 0: return False return True n = 2 while running.value: is_prime(n) n += 1 if n > 1000000: n = 2 def check_npu_available() -> bool: """Check if NPU is actually available""" if os.path.exists('/dev/rknpu'): return True npu_paths = [ '/sys/devices/platform/fde40000.npu', '/sys/devices/platform/fdab0000.npu', ] for path in npu_paths: if os.path.exists(path): return True return False def gpu_stress_worker(running: Value): """GPU stress worker - heavy compute operations""" size = 1024 while running.value: a = np.random.rand(size, size).astype(np.float32) b = np.random.rand(size, size).astype(np.float32) c = np.dot(a, b) np.fft.fft2(a) np.linalg.svd(a[:256, :256], compute_uv=False) def npu_stress_worker(model_path: str, running: Value): """NPU stress worker - runs continuous inference on the NPU.""" if not HAS_RKNN: return if not check_npu_available(): gpu_stress_worker(running) return # Suppress ALL RKNN output (it writes to both stdout and stderr at C level) # Save original file descriptors stdout_fd = sys.stdout.fileno() stderr_fd = sys.stderr.fileno() saved_stdout = os.dup(stdout_fd) saved_stderr = os.dup(stderr_fd) # Redirect both to /dev/null devnull_fd = os.open(os.devnull, os.O_WRONLY) os.dup2(devnull_fd, stdout_fd) os.dup2(devnull_fd, stderr_fd) os.close(devnull_fd) try: rknn = RKNNLite() if not model_path or not os.path.exists(model_path): return ret = rknn.load_rknn(model_path) if ret != 0: return ret = rknn.init_runtime() if ret != 0: rknn.release() return input_shapes = [ (1, 320, 320, 3), (1, 3, 320, 320), (320, 320, 3), (1, 224, 224, 3), (1, 3, 224, 224), ] input_data = None for shape in input_shapes: try: test_input = np.random.randint(0, 255, shape, dtype=np.uint8) rknn.inference(inputs=[test_input]) input_data = test_input break except: continue if input_data is None: rknn.release() return while running.value: input_data = np.random.randint(0, 255, input_data.shape, dtype=np.uint8) rknn.inference(inputs=[input_data]) rknn.release() finally: # Restore stdout and stderr os.dup2(saved_stdout, stdout_fd) os.dup2(saved_stderr, stderr_fd) os.close(saved_stdout) os.close(saved_stderr) def get_temperatures() -> dict[str, float]: """Get current temperatures""" temps = {} thermal_path = "/sys/class/thermal" for zone_path in glob.glob(f"{thermal_path}/thermal_zone*"): try: with open(f"{zone_path}/type") as f: zone_type = f.read().strip() with open(f"{zone_path}/temp") as f: temp = int(f.read().strip()) / 1000.0 temps[zone_type] = temp except: pass return temps def get_temp_color(temp: float) -> str: """Get color based on temperature""" if temp >= 80: return RED elif temp >= 60: return YELLOW return GREEN def get_status_icon(temp: float) -> str: """Get status icon based on temperature""" if temp >= 80: return "šŸ”“" elif temp >= 60: return "🟔" return "🟢" def main(): parser = argparse.ArgumentParser( description="šŸ”„ Heat Generator - Stress test CPU & NPU with live temperature graph" ) parser.add_argument('-c', '--cpu-only', action='store_true', help='Only stress CPU') parser.add_argument('-n', '--npu-only', action='store_true', help='Only stress NPU') parser.add_argument('-w', '--workers', type=int, default=0, help='Number of CPU workers (default: all cores)') parser.add_argument('-m', '--model', type=str, default='/home/radxa/little_sophia_brain/RetinaFace.rknn', help='RKNN model path') parser.add_argument('-t', '--time', type=int, default=0, help='Run for N seconds (0 = until Ctrl+C)') parser.add_argument('--no-graph', action='store_true', help='Disable temperature graph') args = parser.parse_args() num_cpus = multiprocessing.cpu_count() num_workers = args.workers if args.workers > 0 else num_cpus npu_available = check_npu_available() # Shared flag for stopping workers running = Value(c_bool, True) processes = [] # Signal handler def signal_handler(sig, frame): running.value = False signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Get terminal size for graph height term_cols, term_rows = get_terminal_size() # Calculate available space for graph: # Header: 4 lines (═══, title, ═══, blank) # Status: 2 lines (info, blank) # Table: 5 lines (header, ─── , sensor1, sensor2, ───) # Footer: 2 lines (blank, Ctrl+C) # Total fixed: 13 lines # Graph overhead: ~4 lines (title, y-label area, x-axis, x-label) # So graph plot area = term_rows - 13 - 4 = term_rows - 17 # Function to get graph height based on current terminal size def get_current_graph_height(): _, rows = get_terminal_size() return max(6, rows - 17) # Initialize display - use alternate screen buffer for clean full-screen display sys.stdout.write(ALT_SCREEN_ON + HIDE_CURSOR + CLEAR_SCREEN + CURSOR_HOME) sys.stdout.flush() # Temperature history for graphing (~30 seconds at 20 updates/sec) history = TemperatureHistory(max_samples=600) start_time = time.time() try: # Start workers silently if not args.npu_only: for i in range(num_workers): p = Process(target=cpu_stress_worker, args=(i, running)) p.start() processes.append(p) if not args.cpu_only and HAS_RKNN: p = Process(target=npu_stress_worker, args=(args.model, running)) p.start() processes.append(p) # Main display loop end_time = start_time + args.time if args.time > 0 else float('inf') # Graph caching - only update graph every 0.2s (5fps) for performance cached_graph = "" last_graph_time = 0 graph_update_interval = 0.2 while running.value and time.time() < end_time: temps = get_temperatures() history.update(temps) elapsed = time.time() - start_time # Build entire output in memory first lines = [] # Header lines.append(f"{BOLD}{RED}{'═' * 60}{RESET}") lines.append(f"{BOLD}{RED} šŸ”„ HEAT GENERATOR - STRESS TEST šŸ”„{RESET}") lines.append(f"{BOLD}{RED}{'═' * 60}{RESET}") lines.append("") # System info status_line = f" {CYAN}CPU Workers:{RESET} {num_workers if not args.npu_only else 0} " status_line += f"{CYAN}NPU:{RESET} {'āœ… Active' if (not args.cpu_only and npu_available) else 'āŒ Off'} " status_line += f"{CYAN}Elapsed:{RESET} {elapsed:.1f}s" if args.time > 0: remaining = args.time - elapsed status_line += f" {CYAN}Remaining:{RESET} {remaining:.1f}s" lines.append(status_line) lines.append("") # Current temperatures table lines.append(f" {'Sensor':<12} {'Current':>10} {'Start':>10} {'Min':>8} {'Max':>8}") lines.append(f" {'-' * 52}") for name, temp in temps.items(): friendly = name.replace('-thermal', '').replace('_thermal', '').upper() icon = get_status_icon(temp) color = get_temp_color(temp) start_t = history.start_temps.get(name, temp) min_t = history.min_temps.get(name, temp) max_t = history.max_temps.get(name, temp) lines.append(f" {icon} {friendly:<10} {color}{temp:>7.1f}°C{RESET} {start_t:>7.1f}°C {min_t:>6.1f}° {max_t:>6.1f}°") lines.append(f" {'-' * 52}") # Temperature graph - only regenerate every 0.2s for performance current_time = time.time() if not args.no_graph and len(history.time_points) > 2: if current_time - last_graph_time >= graph_update_interval: current_graph_height = get_current_graph_height() cached_graph = history.get_graph(height=current_graph_height) last_graph_time = current_time graph_str = cached_graph # Footer lines.append("") lines.append(f" Press {BOLD}Ctrl+C{RESET} to stop") # Build output: header content first header_content = "\n".join(lines[:len(lines)-2]) # Everything except last 2 lines footer_content = "\n".join(lines[-2:]) # Last 2 lines # Calculate total lines header_lines = header_content.count('\n') + 1 graph_lines = graph_str.count('\n') + 1 if graph_str else 0 footer_lines = footer_content.count('\n') + 1 total_content_lines = header_lines + graph_lines + footer_lines # Build complete output if graph_str: output = header_content + "\n" + graph_str + "\n" + footer_content else: output = header_content + "\n" + footer_content # Pad to exactly fill terminal (add lines at the end) current_lines = output.count('\n') + 1 if current_lines < term_rows: output += "\n" * (term_rows - current_lines) # Single atomic write: home + content + clear remainder sys.stdout.write(CURSOR_HOME + output + CLEAR_TO_END) sys.stdout.flush() time.sleep(0.05) # 20 updates per second running.value = False finally: running.value = False # Exit alternate screen and restore cursor sys.stdout.write(ALT_SCREEN_OFF + SHOW_CURSOR) sys.stdout.flush() # Wait for workers for p in processes: p.join(timeout=2) if p.is_alive(): p.terminate() # Final summary on clean screen elapsed = time.time() - start_time temps = get_temperatures() print(f"{GREEN}{'═' * 60}{RESET}") print(f"{GREEN} āœ… Heat generation stopped after {elapsed:.1f}s{RESET}") print(f"{GREEN}{'═' * 60}{RESET}\n") print(f" {'Sensor':<12} {'Final':>10} {'Start':>10} {'Peak':>10} {'Rise':>10}") print(f" {'-' * 54}") for name, temp in temps.items(): friendly = name.replace('-thermal', '').replace('_thermal', '').upper() color = get_temp_color(temp) start_t = history.start_temps.get(name, temp) max_t = history.max_temps.get(name, temp) rise = max_t - start_t print(f" {friendly:<12} {color}{temp:>7.1f}°C{RESET} {start_t:>7.1f}°C {max_t:>7.1f}°C {'+' if rise >= 0 else ''}{rise:>6.1f}°C") print() if __name__ == "__main__": main()