HansonServo/tools/viseme_sender.py

233 lines
6.8 KiB
Python

"""
Viseme Sender - sends viseme packets to the robot over WebSocket.
Usage:
python viseme_sender.py [robot_ip]
Default IP: 192.168.1.x (auto-discovered or specify as argument)
Connects to ws://<ip>:81
Controls:
0-9, A = send viseme 0-10
Space = send neutral (viseme 0 / sil)
Q / Esc = quit
L = request viseme list (VLST)
"""
import asyncio
import struct
import sys
try:
import websockets
except ImportError:
print("Missing dependency. Install with: pip install websockets")
sys.exit(1)
# ---------------------------------------------------------------------------
# Protocol helpers
# ---------------------------------------------------------------------------
SYNC = b'\xA5\x5A'
def crc16_ccitt(data: bytes, init: int = 0xFFFF) -> int:
crc = init
for b in data:
crc ^= b << 8
for _ in range(8):
if crc & 0x8000:
crc = (crc << 1) ^ 0x1021
else:
crc <<= 1
crc &= 0xFFFF
return crc
_seq = 0
def build_packet(tag: str, payload: bytes = b'') -> bytes:
global _seq
tag_bytes = tag.encode('ascii')[:4].ljust(4, b'\x00')
length = len(payload)
header_tail = tag_bytes + struct.pack('<HH', length, _seq)
crc_data = header_tail + payload
crc = crc16_ccitt(crc_data)
packet = SYNC + crc_data + struct.pack('<H', crc)
_seq = (_seq + 1) & 0xFFFF
return packet
def parse_packet(data: bytes):
"""Parse a protocol packet, return (tag, payload) or None."""
if len(data) < 12:
return None
if data[0:2] != SYNC:
return None
tag = data[2:6].decode('ascii', errors='replace')
length = struct.unpack('<H', data[6:8])[0]
# seq = struct.unpack('<H', data[8:10])[0]
if len(data) < 10 + length + 2:
return None
payload = data[10:10 + length]
received_crc = struct.unpack('<H', data[10 + length:12 + length])[0]
computed_crc = crc16_ccitt(data[2:10 + length])
if received_crc != computed_crc:
return None
return tag, payload
# ---------------------------------------------------------------------------
# Viseme table
# ---------------------------------------------------------------------------
VISEMES = {
0: 'sil',
1: 'AA',
2: 'AE',
3: 'AH',
4: 'AO',
5: 'EH',
6: 'IH',
7: 'IY',
8: 'OW',
9: 'UH',
10: 'UW',
}
KEY_MAP = {
'0': 0, '1': 1, '2': 2, '3': 3, '4': 4,
'5': 5, '6': 6, '7': 7, '8': 8, '9': 9,
'a': 10, 'b': 11, 'c': 12, 'd': 13, 'e': 14,
'f': 15, 'g': 16, 'h': 17, 'i': 18, 'j': 19,
' ': 0,
}
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
async def main():
ip = sys.argv[1] if len(sys.argv) > 1 else '192.168.1.1'
uri = f'ws://{ip}:81'
print(f'Connecting to {uri} ...')
try:
ws = await websockets.connect(uri)
except Exception as e:
print(f'Connection failed: {e}')
return
print(f'Connected to {uri}')
print()
print('Viseme keys:')
for k, vid in sorted(KEY_MAP.items()):
label = VISEMES.get(vid, f'viseme {vid}')
key_label = 'Space' if k == ' ' else k.upper()
print(f' [{key_label}] -> {vid}: {label}')
print(' [L] -> list visemes from device')
print(' [Q/Esc] -> quit')
print()
# Background task to receive and print responses
async def receiver():
try:
async for message in ws:
if isinstance(message, bytes):
result = parse_packet(message)
if result:
tag, payload = result
if tag == 'VLST':
print_viseme_list(payload)
elif tag == 'ACK!':
print(f' <- ACK')
elif tag.startswith('NAC'):
msg = payload.decode('ascii', errors='replace') if payload else ''
print(f' <- NACK: {msg}')
elif tag == 'MSGE':
msg = payload.decode('ascii', errors='replace')
print(f' <- MSG: {msg}')
else:
print(f' <- [{tag}] {len(payload)} bytes')
except websockets.exceptions.ConnectionClosed:
pass
recv_task = asyncio.create_task(receiver())
# Input loop (runs in executor to avoid blocking)
loop = asyncio.get_event_loop()
try:
while True:
key = await loop.run_in_executor(None, get_key)
if key is None or key in ('q', '\x1b'):
break
if key == 'l':
pkt = build_packet('VLST')
await ws.send(pkt)
print(' -> VLST (list visemes)')
continue
if key in KEY_MAP:
vid = KEY_MAP[key]
pkt = build_packet('VSME', bytes([vid]))
await ws.send(pkt)
label = VISEMES.get(vid, f'viseme {vid}')
print(f' -> VSME {vid}: {label}')
except (KeyboardInterrupt, EOFError):
pass
finally:
recv_task.cancel()
await ws.close()
print('Disconnected.')
def print_viseme_list(payload: bytes):
if len(payload) < 1:
print(' <- VLST: (empty)')
return
count = payload[0]
print(f' <- VLST: {count} visemes')
pos = 1
for _ in range(count):
if pos + 4 >= len(payload):
break
vid = payload[pos]
label = payload[pos+1:pos+4].decode('ascii', errors='replace')
motor_count = payload[pos+4]
pos += 5
motors = []
for _ in range(motor_count):
if pos + 3 > len(payload):
break
mid = payload[pos]
mpos = struct.unpack('<H', payload[pos+1:pos+3])[0]
motors.append(f'{mid}={mpos}')
pos += 3
print(f' [{vid:2d}] "{label}" motors: {", ".join(motors) if motors else "(none)"}')
# ---------------------------------------------------------------------------
# Cross-platform single key input
# ---------------------------------------------------------------------------
def get_key() -> str | None:
"""Block until a single key is pressed, return it lowercase."""
try:
import msvcrt # Windows
ch = msvcrt.getwch()
return ch.lower()
except ImportError:
pass
# Unix / macOS
import tty, termios
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
try:
tty.setraw(fd)
ch = sys.stdin.read(1)
return ch.lower()
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print('\nBye.')