""" Viseme Sender - sends viseme packets to the robot over WebSocket. Usage: python viseme_sender.py [robot_ip] Default IP: 192.168.1.x (auto-discovered or specify as argument) Connects to ws://:81 Controls: 0-9, A = send viseme 0-10 Space = send neutral (viseme 0 / sil) Q / Esc = quit L = request viseme list (VLST) """ import asyncio import struct import sys try: import websockets except ImportError: print("Missing dependency. Install with: pip install websockets") sys.exit(1) # --------------------------------------------------------------------------- # Protocol helpers # --------------------------------------------------------------------------- SYNC = b'\xA5\x5A' def crc16_ccitt(data: bytes, init: int = 0xFFFF) -> int: crc = init for b in data: crc ^= b << 8 for _ in range(8): if crc & 0x8000: crc = (crc << 1) ^ 0x1021 else: crc <<= 1 crc &= 0xFFFF return crc _seq = 0 def build_packet(tag: str, payload: bytes = b'') -> bytes: global _seq tag_bytes = tag.encode('ascii')[:4].ljust(4, b'\x00') length = len(payload) header_tail = tag_bytes + struct.pack(' 1 else '192.168.1.1' uri = f'ws://{ip}:81' print(f'Connecting to {uri} ...') try: ws = await websockets.connect(uri) except Exception as e: print(f'Connection failed: {e}') return print(f'Connected to {uri}') print() print('Viseme keys:') for k, vid in sorted(KEY_MAP.items()): label = VISEMES.get(vid, f'viseme {vid}') key_label = 'Space' if k == ' ' else k.upper() print(f' [{key_label}] -> {vid}: {label}') print(' [L] -> list visemes from device') print(' [Q/Esc] -> quit') print() # Background task to receive and print responses async def receiver(): try: async for message in ws: if isinstance(message, bytes): result = parse_packet(message) if result: tag, payload = result if tag == 'VLST': print_viseme_list(payload) elif tag == 'ACK!': print(f' <- ACK') elif tag.startswith('NAC'): msg = payload.decode('ascii', errors='replace') if payload else '' print(f' <- NACK: {msg}') elif tag == 'MSGE': msg = payload.decode('ascii', errors='replace') print(f' <- MSG: {msg}') else: print(f' <- [{tag}] {len(payload)} bytes') except websockets.exceptions.ConnectionClosed: pass recv_task = asyncio.create_task(receiver()) # Input loop (runs in executor to avoid blocking) loop = asyncio.get_event_loop() try: while True: key = await loop.run_in_executor(None, get_key) if key is None or key in ('q', '\x1b'): break if key == 'l': pkt = build_packet('VLST') await ws.send(pkt) print(' -> VLST (list visemes)') continue if key in KEY_MAP: vid = KEY_MAP[key] pkt = build_packet('VSME', bytes([vid])) await ws.send(pkt) label = VISEMES.get(vid, f'viseme {vid}') print(f' -> VSME {vid}: {label}') except (KeyboardInterrupt, EOFError): pass finally: recv_task.cancel() await ws.close() print('Disconnected.') def print_viseme_list(payload: bytes): if len(payload) < 1: print(' <- VLST: (empty)') return count = payload[0] print(f' <- VLST: {count} visemes') pos = 1 for _ in range(count): if pos + 4 >= len(payload): break vid = payload[pos] label = payload[pos+1:pos+4].decode('ascii', errors='replace') motor_count = payload[pos+4] pos += 5 motors = [] for _ in range(motor_count): if pos + 3 > len(payload): break mid = payload[pos] mpos = struct.unpack(' str | None: """Block until a single key is pressed, return it lowercase.""" try: import msvcrt # Windows ch = msvcrt.getwch() return ch.lower() except ImportError: pass # Unix / macOS import tty, termios fd = sys.stdin.fileno() old = termios.tcgetattr(fd) try: tty.setraw(fd) ch = sys.stdin.read(1) return ch.lower() finally: termios.tcsetattr(fd, termios.TCSADRAIN, old) if __name__ == '__main__': try: asyncio.run(main()) except KeyboardInterrupt: print('\nBye.')