#!/usr/bin/env python3 """ Serial port proxy (MITM): connects to a real COM device and exposes a virtual COM port for the Arduino IDE. Records all traffic both ways while passing it through unchanged. """ import sys import threading import time from datetime import datetime from pathlib import Path import serial import serial.tools.list_ports # Default baud rate (Arduino Uno/Nano typically use 115200 for serial monitor) DEFAULT_BAUD = 115200 def list_ports(): """Return list of (port, description) for all COM ports.""" return [ (p.device, f"{p.device} — {p.description or 'Unknown'}") for p in serial.tools.list_ports.comports() ] def select_port(prompt: str, exclude: list[str] | None = None) -> str | None: """Show numbered list of COM ports and return selected port or None.""" ports = list_ports() exclude = exclude or [] ports = [(dev, desc) for dev, desc in ports if dev not in exclude] if not ports: print("No COM ports found.") return None print(prompt) for i, (dev, desc) in enumerate(ports, 1): print(f" {i}. {desc}") print(" 0. Cancel") while True: try: choice = input("Choice: ").strip() if choice == "0": return None idx = int(choice) if 1 <= idx <= len(ports): return ports[idx - 1][0] except ValueError: pass print("Invalid choice.") def _is_port_in_use_error(exc: BaseException) -> bool: """True if the exception indicates the port is already open by another process.""" msg = str(exc).lower() return ( "in use" in msg or "access is denied" in msg or "permission" in msg or "permissionerror" in msg or "error 5" in msg or "error 32" in msg ) def _is_elevation_error(exc: BaseException) -> bool: """True if the exception is WinError 740 (operation requires elevation).""" if isinstance(exc, OSError) and getattr(exc, "winerror", None) == 740: return True return "740" in str(exc) or "elevation" in str(exc).lower() # Escape sequences for ASCII display (so \r \n etc. show instead of '.') _ASCII_ESCAPES = { 0x00: "\\0", 0x07: "\\a", 0x08: "\\b", 0x09: "\\t", 0x0A: "\\n", 0x0B: "\\v", 0x0C: "\\f", 0x0D: "\\r", 0x1B: "\\e", } def _format_bytes(data: bytes) -> dict[str, str]: """Return hex, dec, and ascii representations of data for logging.""" hex_str = " ".join(f"{b:02X}" for b in data) if len(data) <= 32 else " ".join(f"{b:02X}" for b in data[:32]) + " ..." dec_str = " ".join(str(b) for b in data) if len(data) <= 48 else " ".join(str(b) for b in data[:48]) + " ..." # Printable ASCII; common controls as \r \n \t etc.; other non-printable as \xNN parts = [] for b in data: if b in _ASCII_ESCAPES: parts.append(_ASCII_ESCAPES[b]) elif 32 <= b <= 126: parts.append(chr(b)) else: parts.append(f"\\x{b:02X}") ascii_str = "".join(parts) return {"hex": hex_str, "dec": dec_str, "ascii": ascii_str} def relay_loop( name: str, read_ser: serial.Serial, write_ser: serial.Serial, direction: str, log_file, log_console: bool, log_lock: threading.Lock, ): """Read from read_ser, write to write_ser, and log each chunk. Does not close ports.""" try: while read_ser.is_open and write_ser.is_open: data = read_ser.read(read_ser.in_waiting or 1) if not data: time.sleep(0.01) continue write_ser.write(data) write_ser.flush() ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] line = f"[{ts}] {direction} ({len(data)} bytes)\n" fmts = _format_bytes(data) if log_file: with log_lock: log_file.write(line) log_file.write(f" hex: {fmts['hex']}\n") log_file.write(f" dec: {fmts['dec']}\n") log_file.write(f" ascii: {fmts['ascii']}\n") log_file.flush() if log_console: print(line.strip()) print(f" hex: {fmts['hex']}") print(f" dec: {fmts['dec']}") print(f" ascii: {fmts['ascii']}") except (serial.SerialException, OSError) as e: if read_ser.is_open or write_ser.is_open: print(f"[{name}] {e}") def main(): print("Serial proxy (MITM) — connect Arduino IDE to a virtual port while we talk to the real device.\n") # 1) Select real device port real_port = select_port("Select the REAL device port (e.g. your Arduino):") if not real_port: print("Aborted.") return 1 # 2) Virtual port: you already have a pair (e.g. VSPE); choose which end this app uses other_ports = [(d, desc) for d, desc in list_ports() if d != real_port] if not other_ports: print("No other COM ports found. You only have your device connected.") print("Create a virtual port pair (e.g. in VSPE), then run this script again.") return 1 print() print("Select the virtual port that THIS APP will use (e.g. COM1 for VSPE).") print("In Arduino IDE, open the OTHER end of the pair (e.g. COM2).\n") virtual_port_ours = select_port("Virtual port for this app:", exclude=[real_port]) if not virtual_port_ours: print("Aborted.") return 1 virtual_port_ide = None # Baud rate try: baud_str = input(f"\nBaud rate [{DEFAULT_BAUD}]: ").strip() or str(DEFAULT_BAUD) baud = int(baud_str) except ValueError: baud = DEFAULT_BAUD print(f"Using baud rate: {baud}") # Log file log_path = Path(__file__).resolve().parent / "serial_log.txt" log_console = True try: log_file = open(log_path, "a", encoding="utf-8") log_file.write("\n--- session started " + datetime.now().isoformat() + " ---\n") log_file.write(f"real={real_port} virtual_ours={virtual_port_ours} virtual_ide={virtual_port_ide} baud={baud}\n") log_file.flush() except OSError as e: print(f"Could not open log file: {e}") log_file = None print(f"\nLogging to: {log_path}") print("Opening ports...") try: ser_real = serial.Serial(real_port, baud, timeout=0) except (serial.SerialException, OSError) as e: print(f"Failed to open real port {real_port}: {e}") if _is_elevation_error(e): print(" → Windows is requesting administrator rights. Try running this script as a NORMAL user (not 'Run as administrator').") print(" Serial ports often work without admin; running elevated can sometimes cause this error.") elif _is_port_in_use_error(e): print(" → Close any app using that port (Serial Monitor, another terminal, previous script), then try again.") if log_file: log_file.close() return 1 # Virtual port: retry a few times in case it's slow to become available ser_virtual = None for attempt in range(5): try: ser_virtual = serial.Serial(virtual_port_ours, baud, timeout=0) break except (serial.SerialException, OSError) as e: in_use = _is_port_in_use_error(e) if _is_elevation_error(e): ser_real.close() print(f"Failed to open virtual port {virtual_port_ours}: {e}") print(" → Try running this script as a NORMAL user (not 'Run as administrator').") if log_file: log_file.close() return 1 if attempt < 4: print(f" Could not open {virtual_port_ours} (attempt {attempt + 1}/5), retrying in 2s...") if in_use: print(" → Close Arduino IDE Serial Monitor and any other app using that port, then wait for retry.") time.sleep(2) else: ser_real.close() print(f"Failed to open virtual port {virtual_port_ours}: {e}") if in_use: print("\nPort is in use by another process. Do this then run the script again:") print(" • Close Arduino IDE Serial Monitor (and any other app using that port).") print(" • Close any other window running this script.") else: print("\nMake sure the virtual port exists and is available, then run this script again.") if log_file: log_file.close() return 1 if virtual_port_ide: print(f"\n>>> In Arduino IDE, select port: {virtual_port_ide} <<<\n") else: print(f"\n>>> In Arduino IDE, select the OTHER end of the pair (not {virtual_port_ours}) <<<\n") print("Relaying and recording. Press Ctrl+C to stop.") print("Type a note and press Enter to annotate the log before testing an action.\n") log_lock = threading.Lock() def note_logger(): """Read lines from stdin and write them to the log as [NOTE] annotations.""" while True: try: line = input() except EOFError: break if line.strip() and log_file: ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] with log_lock: log_file.write(f"[{ts}] NOTE: {line}\n") log_file.flush() print(" (note logged)") # IDE -> device t1 = threading.Thread( target=relay_loop, args=("IDE->device", ser_virtual, ser_real, "IDE->device", log_file, log_console, log_lock), daemon=True, ) # device -> IDE t2 = threading.Thread( target=relay_loop, args=("device->IDE", ser_real, ser_virtual, "device->IDE", log_file, log_console, log_lock), daemon=True, ) t_note = threading.Thread(target=note_logger, daemon=True) t1.start() t2.start() t_note.start() try: while t1.is_alive() or t2.is_alive(): t1.join(timeout=0.5) t2.join(timeout=0.5) except KeyboardInterrupt: print("\nStopping...") finally: for s in (ser_real, ser_virtual): try: s.close() except Exception: pass if log_file: with log_lock: log_file.write("--- session ended ---\n") log_file.close() print("Done.") return 0 if __name__ == "__main__": sys.exit(main())