From 490406e3cd713bd85a35b8b318f92c0abb207ddb Mon Sep 17 00:00:00 2001 From: radxa Date: Sun, 7 Dec 2025 10:36:47 +0000 Subject: [PATCH] first --- heat_generator.py | 532 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 532 insertions(+) create mode 100755 heat_generator.py diff --git a/heat_generator.py b/heat_generator.py new file mode 100755 index 0000000..e63755f --- /dev/null +++ b/heat_generator.py @@ -0,0 +1,532 @@ +#!/usr/bin/env python3 +""" +Heat Generator - CPU & NPU Stress Test +For testing cooling solutions on Rockchip boards +With real-time temperature graphing +""" + +import os +import sys +import time +import glob +import signal +import argparse +import multiprocessing +from multiprocessing import Process, Value +from ctypes import c_bool +from collections import deque +from io import StringIO + +# Check for numpy (used for heavier CPU load) +try: + import numpy as np + HAS_NUMPY = True +except ImportError: + HAS_NUMPY = False + +# Check for RKNN Lite (NPU) +try: + from rknnlite.api import RKNNLite + HAS_RKNN = True +except ImportError: + HAS_RKNN = False + +# Check for plotext (graphing) +try: + import plotext as plt + HAS_PLOTEXT = True +except ImportError: + HAS_PLOTEXT = False + + +# ANSI escape codes +RED = "\033[91m" +GREEN = "\033[92m" +YELLOW = "\033[93m" +CYAN = "\033[96m" +RESET = "\033[0m" +BOLD = "\033[1m" + +# Flicker-free display +CURSOR_HOME = "\033[H" +CLEAR_SCREEN = "\033[2J" +CLEAR_TO_END = "\033[J" +HIDE_CURSOR = "\033[?25l" +SHOW_CURSOR = "\033[?25h" +# Alternate screen buffer (like vim/htop use) +ALT_SCREEN_ON = "\033[?1049h" +ALT_SCREEN_OFF = "\033[?1049l" + + +def get_terminal_size() -> tuple[int, int]: + """Get terminal size (columns, rows)""" + try: + import shutil + size = shutil.get_terminal_size() + return size.columns, size.lines + except: + return 80, 24 + +# Graph colors +GRAPH_COLORS = ['red', 'cyan', 'green', 'yellow', 'magenta'] + + +class TemperatureHistory: + """Track temperature history for graphing""" + + def __init__(self, max_samples: int = 120): + self.max_samples = max_samples + self.history: dict[str, deque] = {} + self.time_points: deque = deque(maxlen=max_samples) + self.start_time = time.time() + self.start_temps: dict[str, float] = {} + self.max_temps: dict[str, float] = {} + self.min_temps: dict[str, float] = {} + + def update(self, temps: dict[str, float]): + """Add new temperature readings""" + current_time = time.time() - self.start_time + self.time_points.append(current_time) + + for name, temp in temps.items(): + if name not in self.history: + self.history[name] = deque(maxlen=self.max_samples) + self.start_temps[name] = temp + self.max_temps[name] = temp + self.min_temps[name] = temp + + self.history[name].append(temp) + self.max_temps[name] = max(self.max_temps[name], temp) + self.min_temps[name] = min(self.min_temps[name], temp) + + def get_graph(self, height: int = 12) -> str: + """Generate temperature graph as string""" + if not HAS_PLOTEXT or len(self.time_points) < 2: + return self._get_ascii_graph() + + try: + plt.clear_figure() + plt.clear_data() + + plt.theme('dark') + plt.title("šŸŒ”ļø Temperature History") + plt.xlabel("Time (seconds)") + plt.ylabel("Temp (°C)") + + time_list = list(self.time_points) + + for i, (name, temps) in enumerate(self.history.items()): + temps_list = list(temps) + friendly = name.replace('-thermal', '').replace('_thermal', '').upper() + color = GRAPH_COLORS[i % len(GRAPH_COLORS)] + + min_len = min(len(time_list), len(temps_list)) + if min_len > 1: + plt.plot( + time_list[-min_len:], + temps_list[-min_len:], + label=friendly, + color=color + ) + + # Set y-axis range + all_temps = [t for temps in self.history.values() for t in temps] + if all_temps: + min_temp = max(0, min(all_temps) - 5) + max_temp = min(105, max(all_temps) + 5) + plt.ylim(min_temp, max_temp) + + plt.plotsize(None, height) + plt.build() + return plt.active().build() + except Exception: + return self._get_ascii_graph() + + def _get_ascii_graph(self) -> str: + """Fallback ASCII sparkline graph""" + lines = [] + lines.append("\nšŸ“Š Temperature History") + lines.append("-" * 50) + + chars = " ā–ā–‚ā–ƒā–„ā–…ā–†ā–‡ā–ˆ" + + for name, temps in self.history.items(): + temps_list = list(temps)[-50:] + if not temps_list: + continue + + friendly = name.replace('-thermal', '').replace('_thermal', '').upper() + min_t, max_t = min(temps_list), max(temps_list) + range_t = max_t - min_t if max_t > min_t else 1 + + sparkline = "" + for t in temps_list: + idx = int((t - min_t) / range_t * (len(chars) - 1)) + sparkline += chars[idx] + + lines.append(f"{friendly:8s} [{min_t:5.1f}-{max_t:5.1f}°C] {sparkline}") + + return "\n".join(lines) + + +def cpu_stress_worker(worker_id: int, running: Value): + """CPU stress worker - performs intensive calculations.""" + if HAS_NUMPY: + size = 500 + while running.value: + a = np.random.rand(size, size).astype(np.float32) + b = np.random.rand(size, size).astype(np.float32) + c = np.dot(a, b) + np.sin(a) + np.cos(b) + np.exp(c / 1000) + else: + def is_prime(n): + if n < 2: + return False + for i in range(2, int(n ** 0.5) + 1): + if n % i == 0: + return False + return True + + n = 2 + while running.value: + is_prime(n) + n += 1 + if n > 1000000: + n = 2 + + +def check_npu_available() -> bool: + """Check if NPU is actually available""" + if os.path.exists('/dev/rknpu'): + return True + + npu_paths = [ + '/sys/devices/platform/fde40000.npu', + '/sys/devices/platform/fdab0000.npu', + ] + for path in npu_paths: + if os.path.exists(path): + return True + + return False + + +def gpu_stress_worker(running: Value): + """GPU stress worker - heavy compute operations""" + size = 1024 + while running.value: + a = np.random.rand(size, size).astype(np.float32) + b = np.random.rand(size, size).astype(np.float32) + c = np.dot(a, b) + np.fft.fft2(a) + np.linalg.svd(a[:256, :256], compute_uv=False) + + +def npu_stress_worker(model_path: str, running: Value): + """NPU stress worker - runs continuous inference on the NPU.""" + if not HAS_RKNN: + return + + if not check_npu_available(): + gpu_stress_worker(running) + return + + # Suppress ALL RKNN output (it writes to both stdout and stderr at C level) + # Save original file descriptors + stdout_fd = sys.stdout.fileno() + stderr_fd = sys.stderr.fileno() + saved_stdout = os.dup(stdout_fd) + saved_stderr = os.dup(stderr_fd) + + # Redirect both to /dev/null + devnull_fd = os.open(os.devnull, os.O_WRONLY) + os.dup2(devnull_fd, stdout_fd) + os.dup2(devnull_fd, stderr_fd) + os.close(devnull_fd) + + try: + rknn = RKNNLite() + + if not model_path or not os.path.exists(model_path): + return + + ret = rknn.load_rknn(model_path) + if ret != 0: + return + + ret = rknn.init_runtime() + if ret != 0: + rknn.release() + return + + input_shapes = [ + (1, 320, 320, 3), + (1, 3, 320, 320), + (320, 320, 3), + (1, 224, 224, 3), + (1, 3, 224, 224), + ] + + input_data = None + for shape in input_shapes: + try: + test_input = np.random.randint(0, 255, shape, dtype=np.uint8) + rknn.inference(inputs=[test_input]) + input_data = test_input + break + except: + continue + + if input_data is None: + rknn.release() + return + + while running.value: + input_data = np.random.randint(0, 255, input_data.shape, dtype=np.uint8) + rknn.inference(inputs=[input_data]) + + rknn.release() + + finally: + # Restore stdout and stderr + os.dup2(saved_stdout, stdout_fd) + os.dup2(saved_stderr, stderr_fd) + os.close(saved_stdout) + os.close(saved_stderr) + + +def get_temperatures() -> dict[str, float]: + """Get current temperatures""" + temps = {} + thermal_path = "/sys/class/thermal" + + for zone_path in glob.glob(f"{thermal_path}/thermal_zone*"): + try: + with open(f"{zone_path}/type") as f: + zone_type = f.read().strip() + with open(f"{zone_path}/temp") as f: + temp = int(f.read().strip()) / 1000.0 + temps[zone_type] = temp + except: + pass + + return temps + + +def get_temp_color(temp: float) -> str: + """Get color based on temperature""" + if temp >= 80: + return RED + elif temp >= 60: + return YELLOW + return GREEN + + +def get_status_icon(temp: float) -> str: + """Get status icon based on temperature""" + if temp >= 80: + return "šŸ”“" + elif temp >= 60: + return "🟔" + return "🟢" + + +def main(): + parser = argparse.ArgumentParser( + description="šŸ”„ Heat Generator - Stress test CPU & NPU with live temperature graph" + ) + parser.add_argument('-c', '--cpu-only', action='store_true', help='Only stress CPU') + parser.add_argument('-n', '--npu-only', action='store_true', help='Only stress NPU') + parser.add_argument('-w', '--workers', type=int, default=0, help='Number of CPU workers (default: all cores)') + parser.add_argument('-m', '--model', type=str, default='/home/radxa/little_sophia_brain/RetinaFace.rknn', help='RKNN model path') + parser.add_argument('-t', '--time', type=int, default=0, help='Run for N seconds (0 = until Ctrl+C)') + parser.add_argument('--no-graph', action='store_true', help='Disable temperature graph') + + args = parser.parse_args() + + num_cpus = multiprocessing.cpu_count() + num_workers = args.workers if args.workers > 0 else num_cpus + npu_available = check_npu_available() + + # Shared flag for stopping workers + running = Value(c_bool, True) + processes = [] + + # Signal handler + def signal_handler(sig, frame): + running.value = False + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Get terminal size for graph height + term_cols, term_rows = get_terminal_size() + + # Calculate available space for graph: + # Header: 4 lines (═══, title, ═══, blank) + # Status: 2 lines (info, blank) + # Table: 5 lines (header, ─── , sensor1, sensor2, ───) + # Footer: 2 lines (blank, Ctrl+C) + # Total fixed: 13 lines + # Graph overhead: ~4 lines (title, y-label area, x-axis, x-label) + # So graph plot area = term_rows - 13 - 4 = term_rows - 17 + + # Function to get graph height based on current terminal size + def get_current_graph_height(): + _, rows = get_terminal_size() + return max(6, rows - 17) + + # Initialize display - use alternate screen buffer for clean full-screen display + sys.stdout.write(ALT_SCREEN_ON + HIDE_CURSOR + CLEAR_SCREEN + CURSOR_HOME) + sys.stdout.flush() + + # Temperature history for graphing (~30 seconds at 20 updates/sec) + history = TemperatureHistory(max_samples=600) + + start_time = time.time() + + try: + # Start workers silently + if not args.npu_only: + for i in range(num_workers): + p = Process(target=cpu_stress_worker, args=(i, running)) + p.start() + processes.append(p) + + if not args.cpu_only and HAS_RKNN: + p = Process(target=npu_stress_worker, args=(args.model, running)) + p.start() + processes.append(p) + + # Main display loop + end_time = start_time + args.time if args.time > 0 else float('inf') + + # Graph caching - only update graph every 0.2s (5fps) for performance + cached_graph = "" + last_graph_time = 0 + graph_update_interval = 0.2 + + while running.value and time.time() < end_time: + temps = get_temperatures() + history.update(temps) + elapsed = time.time() - start_time + + # Build entire output in memory first + lines = [] + + # Header + lines.append(f"{BOLD}{RED}{'═' * 60}{RESET}") + lines.append(f"{BOLD}{RED} šŸ”„ HEAT GENERATOR - STRESS TEST šŸ”„{RESET}") + lines.append(f"{BOLD}{RED}{'═' * 60}{RESET}") + lines.append("") + + # System info + status_line = f" {CYAN}CPU Workers:{RESET} {num_workers if not args.npu_only else 0} " + status_line += f"{CYAN}NPU:{RESET} {'āœ… Active' if (not args.cpu_only and npu_available) else 'āŒ Off'} " + status_line += f"{CYAN}Elapsed:{RESET} {elapsed:.1f}s" + if args.time > 0: + remaining = args.time - elapsed + status_line += f" {CYAN}Remaining:{RESET} {remaining:.1f}s" + lines.append(status_line) + lines.append("") + + # Current temperatures table + lines.append(f" {'Sensor':<12} {'Current':>10} {'Start':>10} {'Min':>8} {'Max':>8}") + lines.append(f" {'-' * 52}") + + for name, temp in temps.items(): + friendly = name.replace('-thermal', '').replace('_thermal', '').upper() + icon = get_status_icon(temp) + color = get_temp_color(temp) + start_t = history.start_temps.get(name, temp) + min_t = history.min_temps.get(name, temp) + max_t = history.max_temps.get(name, temp) + + lines.append(f" {icon} {friendly:<10} {color}{temp:>7.1f}°C{RESET} {start_t:>7.1f}°C {min_t:>6.1f}° {max_t:>6.1f}°") + + lines.append(f" {'-' * 52}") + + # Temperature graph - only regenerate every 0.2s for performance + current_time = time.time() + if not args.no_graph and len(history.time_points) > 2: + if current_time - last_graph_time >= graph_update_interval: + current_graph_height = get_current_graph_height() + cached_graph = history.get_graph(height=current_graph_height) + last_graph_time = current_time + + graph_str = cached_graph + + # Footer + lines.append("") + lines.append(f" Press {BOLD}Ctrl+C{RESET} to stop") + + # Build output: header content first + header_content = "\n".join(lines[:len(lines)-2]) # Everything except last 2 lines + footer_content = "\n".join(lines[-2:]) # Last 2 lines + + # Calculate total lines + header_lines = header_content.count('\n') + 1 + graph_lines = graph_str.count('\n') + 1 if graph_str else 0 + footer_lines = footer_content.count('\n') + 1 + + total_content_lines = header_lines + graph_lines + footer_lines + + # Build complete output + if graph_str: + output = header_content + "\n" + graph_str + "\n" + footer_content + else: + output = header_content + "\n" + footer_content + + # Pad to exactly fill terminal (add lines at the end) + current_lines = output.count('\n') + 1 + if current_lines < term_rows: + output += "\n" * (term_rows - current_lines) + + # Single atomic write: home + content + clear remainder + sys.stdout.write(CURSOR_HOME + output + CLEAR_TO_END) + sys.stdout.flush() + + time.sleep(0.05) # 20 updates per second + + running.value = False + + finally: + running.value = False + + # Exit alternate screen and restore cursor + sys.stdout.write(ALT_SCREEN_OFF + SHOW_CURSOR) + sys.stdout.flush() + + # Wait for workers + for p in processes: + p.join(timeout=2) + if p.is_alive(): + p.terminate() + + # Final summary on clean screen + elapsed = time.time() - start_time + temps = get_temperatures() + + print(f"{GREEN}{'═' * 60}{RESET}") + print(f"{GREEN} āœ… Heat generation stopped after {elapsed:.1f}s{RESET}") + print(f"{GREEN}{'═' * 60}{RESET}\n") + + print(f" {'Sensor':<12} {'Final':>10} {'Start':>10} {'Peak':>10} {'Rise':>10}") + print(f" {'-' * 54}") + + for name, temp in temps.items(): + friendly = name.replace('-thermal', '').replace('_thermal', '').upper() + color = get_temp_color(temp) + start_t = history.start_temps.get(name, temp) + max_t = history.max_temps.get(name, temp) + rise = max_t - start_t + + print(f" {friendly:<12} {color}{temp:>7.1f}°C{RESET} {start_t:>7.1f}°C {max_t:>7.1f}°C {'+' if rise >= 0 else ''}{rise:>6.1f}°C") + + print() + + +if __name__ == "__main__": + main()