serial_proxy/serial_proxy.py

246 lines
8.5 KiB
Python

#!/usr/bin/env python3
"""
Serial port proxy (MITM): connects to a real COM device and exposes a virtual COM port
for the Arduino IDE. Records all traffic both ways while passing it through unchanged.
"""
import sys
import threading
import time
from datetime import datetime
from pathlib import Path
import serial
import serial.tools.list_ports
# Default baud rate (Arduino Uno/Nano typically use 115200 for serial monitor)
DEFAULT_BAUD = 115200
def list_ports():
"""Return list of (port, description) for all COM ports."""
return [
(p.device, f"{p.device}{p.description or 'Unknown'}")
for p in serial.tools.list_ports.comports()
]
def select_port(prompt: str, exclude: list[str] | None = None) -> str | None:
"""Show numbered list of COM ports and return selected port or None."""
ports = list_ports()
exclude = exclude or []
ports = [(dev, desc) for dev, desc in ports if dev not in exclude]
if not ports:
print("No COM ports found.")
return None
print(prompt)
for i, (dev, desc) in enumerate(ports, 1):
print(f" {i}. {desc}")
print(" 0. Cancel")
while True:
try:
choice = input("Choice: ").strip()
if choice == "0":
return None
idx = int(choice)
if 1 <= idx <= len(ports):
return ports[idx - 1][0]
except ValueError:
pass
print("Invalid choice.")
def _is_port_in_use_error(exc: BaseException) -> bool:
"""True if the exception indicates the port is already open by another process."""
msg = str(exc).lower()
return (
"in use" in msg
or "access is denied" in msg
or "permission" in msg
or "permissionerror" in msg
or "error 5" in msg
or "error 32" in msg
)
def _is_elevation_error(exc: BaseException) -> bool:
"""True if the exception is WinError 740 (operation requires elevation)."""
if isinstance(exc, OSError) and getattr(exc, "winerror", None) == 740:
return True
return "740" in str(exc) or "elevation" in str(exc).lower()
def relay_loop(
name: str,
read_ser: serial.Serial,
write_ser: serial.Serial,
direction: str,
log_file,
log_console: bool,
):
"""Read from read_ser, write to write_ser, and log each chunk. Does not close ports."""
try:
while read_ser.is_open and write_ser.is_open:
data = read_ser.read(read_ser.in_waiting or 1)
if not data:
time.sleep(0.01)
continue
write_ser.write(data)
write_ser.flush()
ts = datetime.now().strftime("%H:%M:%S.%f")[:-3]
line = f"[{ts}] {direction} ({len(data)} bytes)\n"
if log_file:
log_file.write(line)
log_file.write(" hex: " + data.hex() + "\n")
try:
log_file.write(" raw: " + repr(data) + "\n")
except Exception:
pass
log_file.flush()
if log_console:
print(line.strip())
print(" hex:", data.hex())
except (serial.SerialException, OSError) as e:
if read_ser.is_open or write_ser.is_open:
print(f"[{name}] {e}")
def main():
print("Serial proxy (MITM) — connect Arduino IDE to a virtual port while we talk to the real device.\n")
# 1) Select real device port
real_port = select_port("Select the REAL device port (e.g. your Arduino):")
if not real_port:
print("Aborted.")
return 1
# 2) Virtual port: you already have a pair (e.g. VSPE); choose which end this app uses
other_ports = [(d, desc) for d, desc in list_ports() if d != real_port]
if not other_ports:
print("No other COM ports found. You only have your device connected.")
print("Create a virtual port pair (e.g. in VSPE), then run this script again.")
return 1
print()
print("Select the virtual port that THIS APP will use (e.g. COM1 for VSPE).")
print("In Arduino IDE, open the OTHER end of the pair (e.g. COM2).\n")
virtual_port_ours = select_port("Virtual port for this app:", exclude=[real_port])
if not virtual_port_ours:
print("Aborted.")
return 1
virtual_port_ide = None
# Baud rate
try:
baud_str = input(f"\nBaud rate [{DEFAULT_BAUD}]: ").strip() or str(DEFAULT_BAUD)
baud = int(baud_str)
except ValueError:
baud = DEFAULT_BAUD
print(f"Using baud rate: {baud}")
# Log file
log_path = Path(__file__).resolve().parent / "serial_log.txt"
log_console = True
try:
log_file = open(log_path, "a", encoding="utf-8")
log_file.write("\n--- session started " + datetime.now().isoformat() + " ---\n")
log_file.write(f"real={real_port} virtual_ours={virtual_port_ours} virtual_ide={virtual_port_ide} baud={baud}\n")
log_file.flush()
except OSError as e:
print(f"Could not open log file: {e}")
log_file = None
print(f"\nLogging to: {log_path}")
print("Opening ports...")
try:
ser_real = serial.Serial(real_port, baud, timeout=0)
except (serial.SerialException, OSError) as e:
print(f"Failed to open real port {real_port}: {e}")
if _is_elevation_error(e):
print(" → Windows is requesting administrator rights. Try running this script as a NORMAL user (not 'Run as administrator').")
print(" Serial ports often work without admin; running elevated can sometimes cause this error.")
elif _is_port_in_use_error(e):
print(" → Close any app using that port (Serial Monitor, another terminal, previous script), then try again.")
if log_file:
log_file.close()
return 1
# Virtual port: retry a few times in case it's slow to become available
ser_virtual = None
for attempt in range(5):
try:
ser_virtual = serial.Serial(virtual_port_ours, baud, timeout=0)
break
except (serial.SerialException, OSError) as e:
in_use = _is_port_in_use_error(e)
if _is_elevation_error(e):
ser_real.close()
print(f"Failed to open virtual port {virtual_port_ours}: {e}")
print(" → Try running this script as a NORMAL user (not 'Run as administrator').")
if log_file:
log_file.close()
return 1
if attempt < 4:
print(f" Could not open {virtual_port_ours} (attempt {attempt + 1}/5), retrying in 2s...")
if in_use:
print(" → Close Arduino IDE Serial Monitor and any other app using that port, then wait for retry.")
time.sleep(2)
else:
ser_real.close()
print(f"Failed to open virtual port {virtual_port_ours}: {e}")
if in_use:
print("\nPort is in use by another process. Do this then run the script again:")
print(" • Close Arduino IDE Serial Monitor (and any other app using that port).")
print(" • Close any other window running this script.")
else:
print("\nMake sure the virtual port exists and is available, then run this script again.")
if log_file:
log_file.close()
return 1
if virtual_port_ide:
print(f"\n>>> In Arduino IDE, select port: {virtual_port_ide} <<<\n")
else:
print(f"\n>>> In Arduino IDE, select the OTHER end of the pair (not {virtual_port_ours}) <<<\n")
print("Relaying and recording. Press Ctrl+C to stop.\n")
# IDE -> device
t1 = threading.Thread(
target=relay_loop,
args=("IDE->device", ser_virtual, ser_real, "IDE->device", log_file, log_console),
daemon=True,
)
# device -> IDE
t2 = threading.Thread(
target=relay_loop,
args=("device->IDE", ser_real, ser_virtual, "device->IDE", log_file, log_console),
daemon=True,
)
t1.start()
t2.start()
try:
while t1.is_alive() or t2.is_alive():
t1.join(timeout=0.5)
t2.join(timeout=0.5)
except KeyboardInterrupt:
print("\nStopping...")
finally:
for s in (ser_real, ser_virtual):
try:
s.close()
except Exception:
pass
if log_file:
log_file.write("--- session ended ---\n")
log_file.close()
print("Done.")
return 0
if __name__ == "__main__":
sys.exit(main())