233 lines
6.8 KiB
Python
233 lines
6.8 KiB
Python
"""
|
|
Viseme Sender - sends viseme packets to the robot over WebSocket.
|
|
|
|
Usage:
|
|
python viseme_sender.py [robot_ip]
|
|
|
|
Default IP: 192.168.1.x (auto-discovered or specify as argument)
|
|
Connects to ws://<ip>:81
|
|
|
|
Controls:
|
|
0-9, A = send viseme 0-10
|
|
Space = send neutral (viseme 0 / sil)
|
|
Q / Esc = quit
|
|
L = request viseme list (VLST)
|
|
"""
|
|
|
|
import asyncio
|
|
import struct
|
|
import sys
|
|
|
|
try:
|
|
import websockets
|
|
except ImportError:
|
|
print("Missing dependency. Install with: pip install websockets")
|
|
sys.exit(1)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Protocol helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
SYNC = b'\xA5\x5A'
|
|
|
|
def crc16_ccitt(data: bytes, init: int = 0xFFFF) -> int:
|
|
crc = init
|
|
for b in data:
|
|
crc ^= b << 8
|
|
for _ in range(8):
|
|
if crc & 0x8000:
|
|
crc = (crc << 1) ^ 0x1021
|
|
else:
|
|
crc <<= 1
|
|
crc &= 0xFFFF
|
|
return crc
|
|
|
|
_seq = 0
|
|
|
|
def build_packet(tag: str, payload: bytes = b'') -> bytes:
|
|
global _seq
|
|
tag_bytes = tag.encode('ascii')[:4].ljust(4, b'\x00')
|
|
length = len(payload)
|
|
header_tail = tag_bytes + struct.pack('<HH', length, _seq)
|
|
crc_data = header_tail + payload
|
|
crc = crc16_ccitt(crc_data)
|
|
packet = SYNC + crc_data + struct.pack('<H', crc)
|
|
_seq = (_seq + 1) & 0xFFFF
|
|
return packet
|
|
|
|
def parse_packet(data: bytes):
|
|
"""Parse a protocol packet, return (tag, payload) or None."""
|
|
if len(data) < 12:
|
|
return None
|
|
if data[0:2] != SYNC:
|
|
return None
|
|
tag = data[2:6].decode('ascii', errors='replace')
|
|
length = struct.unpack('<H', data[6:8])[0]
|
|
# seq = struct.unpack('<H', data[8:10])[0]
|
|
if len(data) < 10 + length + 2:
|
|
return None
|
|
payload = data[10:10 + length]
|
|
received_crc = struct.unpack('<H', data[10 + length:12 + length])[0]
|
|
computed_crc = crc16_ccitt(data[2:10 + length])
|
|
if received_crc != computed_crc:
|
|
return None
|
|
return tag, payload
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Viseme table
|
|
# ---------------------------------------------------------------------------
|
|
|
|
VISEMES = {
|
|
0: 'sil',
|
|
1: 'AA',
|
|
2: 'AE',
|
|
3: 'AH',
|
|
4: 'AO',
|
|
5: 'EH',
|
|
6: 'IH',
|
|
7: 'IY',
|
|
8: 'OW',
|
|
9: 'UH',
|
|
10: 'UW',
|
|
}
|
|
|
|
KEY_MAP = {
|
|
'0': 0, '1': 1, '2': 2, '3': 3, '4': 4,
|
|
'5': 5, '6': 6, '7': 7, '8': 8, '9': 9,
|
|
'a': 10, 'b': 11, 'c': 12, 'd': 13, 'e': 14,
|
|
'f': 15, 'g': 16, 'h': 17, 'i': 18, 'j': 19,
|
|
' ': 0,
|
|
}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def main():
|
|
ip = sys.argv[1] if len(sys.argv) > 1 else '192.168.1.1'
|
|
uri = f'ws://{ip}:81'
|
|
|
|
print(f'Connecting to {uri} ...')
|
|
try:
|
|
ws = await websockets.connect(uri)
|
|
except Exception as e:
|
|
print(f'Connection failed: {e}')
|
|
return
|
|
|
|
print(f'Connected to {uri}')
|
|
print()
|
|
print('Viseme keys:')
|
|
for k, vid in sorted(KEY_MAP.items()):
|
|
label = VISEMES.get(vid, f'viseme {vid}')
|
|
key_label = 'Space' if k == ' ' else k.upper()
|
|
print(f' [{key_label}] -> {vid}: {label}')
|
|
print(' [L] -> list visemes from device')
|
|
print(' [Q/Esc] -> quit')
|
|
print()
|
|
|
|
# Background task to receive and print responses
|
|
async def receiver():
|
|
try:
|
|
async for message in ws:
|
|
if isinstance(message, bytes):
|
|
result = parse_packet(message)
|
|
if result:
|
|
tag, payload = result
|
|
if tag == 'VLST':
|
|
print_viseme_list(payload)
|
|
elif tag == 'ACK!':
|
|
print(f' <- ACK')
|
|
elif tag.startswith('NAC'):
|
|
msg = payload.decode('ascii', errors='replace') if payload else ''
|
|
print(f' <- NACK: {msg}')
|
|
elif tag == 'MSGE':
|
|
msg = payload.decode('ascii', errors='replace')
|
|
print(f' <- MSG: {msg}')
|
|
else:
|
|
print(f' <- [{tag}] {len(payload)} bytes')
|
|
except websockets.exceptions.ConnectionClosed:
|
|
pass
|
|
|
|
recv_task = asyncio.create_task(receiver())
|
|
|
|
# Input loop (runs in executor to avoid blocking)
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
while True:
|
|
key = await loop.run_in_executor(None, get_key)
|
|
if key is None or key in ('q', '\x1b'):
|
|
break
|
|
if key == 'l':
|
|
pkt = build_packet('VLST')
|
|
await ws.send(pkt)
|
|
print(' -> VLST (list visemes)')
|
|
continue
|
|
if key in KEY_MAP:
|
|
vid = KEY_MAP[key]
|
|
pkt = build_packet('VSME', bytes([vid]))
|
|
await ws.send(pkt)
|
|
label = VISEMES.get(vid, f'viseme {vid}')
|
|
print(f' -> VSME {vid}: {label}')
|
|
except (KeyboardInterrupt, EOFError):
|
|
pass
|
|
finally:
|
|
recv_task.cancel()
|
|
await ws.close()
|
|
print('Disconnected.')
|
|
|
|
|
|
def print_viseme_list(payload: bytes):
|
|
if len(payload) < 1:
|
|
print(' <- VLST: (empty)')
|
|
return
|
|
count = payload[0]
|
|
print(f' <- VLST: {count} visemes')
|
|
pos = 1
|
|
for _ in range(count):
|
|
if pos + 4 >= len(payload):
|
|
break
|
|
vid = payload[pos]
|
|
label = payload[pos+1:pos+4].decode('ascii', errors='replace')
|
|
motor_count = payload[pos+4]
|
|
pos += 5
|
|
motors = []
|
|
for _ in range(motor_count):
|
|
if pos + 3 > len(payload):
|
|
break
|
|
mid = payload[pos]
|
|
mpos = struct.unpack('<H', payload[pos+1:pos+3])[0]
|
|
motors.append(f'{mid}={mpos}')
|
|
pos += 3
|
|
print(f' [{vid:2d}] "{label}" motors: {", ".join(motors) if motors else "(none)"}')
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cross-platform single key input
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_key() -> str | None:
|
|
"""Block until a single key is pressed, return it lowercase."""
|
|
try:
|
|
import msvcrt # Windows
|
|
ch = msvcrt.getwch()
|
|
return ch.lower()
|
|
except ImportError:
|
|
pass
|
|
# Unix / macOS
|
|
import tty, termios
|
|
fd = sys.stdin.fileno()
|
|
old = termios.tcgetattr(fd)
|
|
try:
|
|
tty.setraw(fd)
|
|
ch = sys.stdin.read(1)
|
|
return ch.lower()
|
|
finally:
|
|
termios.tcsetattr(fd, termios.TCSADRAIN, old)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
print('\nBye.')
|