micropython_trasmit_recieve/transmitter.py

324 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from machine import Pin, ADC
import time, network, espnow
# ---------------- ESP-NOW helpers ----------------
_espnow_sta = None
_espnow = None
_espnow_peers = []
_espnow_broadcast = b'\xff\xff\xff\xff\xff\xff'
led = Pin(15, Pin.OUT);
led.value(0)
btn2 = Pin(2, Pin.IN, Pin.PULL_DOWN)
btn5 = Pin(5, Pin.IN, Pin.PULL_DOWN)
btn6 = Pin(6, Pin.IN, Pin.PULL_DOWN)
btn7 = Pin(7, Pin.IN, Pin.PULL_DOWN)
btn11 = Pin(11, Pin.IN, Pin.PULL_DOWN)
btn12 = Pin(12, Pin.IN, Pin.PULL_DOWN)
time.sleep_ms(20)
PAIRMODE_ON_START = btn12.value()
CALIBRATE_ON_START = btn5.value()
print(PAIRMODE_ON_START)
adc_3 = ADC(Pin(3))
adc_4 = ADC(Pin(4))
adc_8 = ADC(Pin(8))
adc_9 = ADC(Pin(9))
adc_1 = ADC(Pin(1))
adc_10 = ADC(Pin(10))
def _espnow_parse_mac(mac):
if isinstance(mac, bytes):
return mac
if isinstance(mac, bytearray):
return bytes(mac)
if isinstance(mac, str):
mac = mac.replace(':', '').replace('-', '').replace(' ', '')
if len(mac) != 12:
raise ValueError('MAC must be 12 hex chars')
return bytes(int(mac[i:i+2], 16) for i in range(0, 12, 2))
raise ValueError('MAC must be bytes or string')
def _espnow_init(channel=6, rate='RATE_LORA_250K'):
global _espnow_sta, _espnow
_espnow_sta = network.WLAN(network.STA_IF)
_espnow_sta.active(True)
_espnow_sta.disconnect()
_espnow_sta.config(channel=int(channel), protocol=network.WLAN.PROTOCOL_LR)
_espnow = espnow.ESPNow()
_espnow.active(True)
rate_val = getattr(espnow, str(rate), espnow.RATE_LORA_250K)
_espnow.config(rate=rate_val)
return _espnow
def _espnow_my_mac():
if _espnow_sta is None:
_espnow_init()
return _espnow_sta.config('mac')
def _espnow_add_peer(mac):
global _espnow_peers
if _espnow is None:
_espnow_init()
peer = _espnow_parse_mac(mac)
if peer not in _espnow_peers:
_espnow.add_peer(peer)
_espnow_peers.append(peer)
return peer
def _espnow_pack(data):
if isinstance(data, (list, tuple)):
return ','.join(str(v) for v in data)
return str(data)
def _espnow_send_peer(mac, data):
if _espnow is None:
_espnow_init()
peer = _espnow_add_peer(mac)
payload = _espnow_pack(data)
_espnow.send(peer, payload)
def _espnow_send_all(data):
if _espnow is None:
_espnow_init()
payload = _espnow_pack(data)
try:
_espnow.add_peer(_espnow_broadcast)
except:
pass
_espnow.send(_espnow_broadcast, payload)
def _espnow_try_num(s):
try:
return int(s)
except:
pass
try:
return float(s)
except:
return s
def _espnow_unpack(msg):
if msg is None:
return []
if isinstance(msg, bytes):
msg = msg.decode('utf-8', 'ignore')
parts = str(msg).split(',')
out = []
for p in parts:
p = p.strip()
if p == '':
continue
out.append(_espnow_try_num(p))
return out
def _espnow_recv(timeout_ms=10):
if _espnow is None:
_espnow_init()
host, msg = _espnow.recv(timeout_ms)
return host, _espnow_unpack(msg)
# ---------------- Pairing logic ----------------
PAIR_PREFIX = "PAIR_REQ:"
PAIR_ACK_PREFIX = "PAIR_ACK:"
PEER_FILE = "peer_mac.txt"
peer_mac = None
pairing_mode = False
def load_peer():
try:
with open(PEER_FILE, "rb") as f:
return f.read()
except:
return None
def save_peer(mac):
with open(PEER_FILE, "wb") as f:
f.write(mac)
def enter_pairing():
global pairing_mode, peer_mac
pairing_mode = True
my_mac = _espnow_my_mac()
print("Pairing mode started, broadcasting...")
while pairing_mode:
# Broadcast as two fields
_espnow_send_all([PAIR_PREFIX, my_mac.hex()])
print("Sent:", [PAIR_PREFIX, my_mac.hex()])
led.value(not led.value())
# Non-blocking receive
host, msg = _espnow_recv(timeout_ms=50)
if msg and isinstance(msg, list) and len(msg) > 1:
if msg[0] == PAIR_ACK_PREFIX:
partner_hex = str(msg[1]).strip()
if len(partner_hex) == 12: # sanity check
partner = _espnow_parse_mac(partner_hex)
save_peer(partner)
peer_mac = partner
print("Paired with", partner_hex)
pairing_mode = False
time.sleep_ms(500)
def calibrate():
print("CALIBRATING")
# ADCs
axes = [adc_3, adc_4, adc_8, adc_9]
# --- Centering phase ---
centers = [0]*len(axes)
samples = 0
start = time.ticks_ms()
next_flash = start
while time.ticks_diff(time.ticks_ms(), start) < 3000:
vals = [a.read() for a in axes]
centers = [c+v for c,v in zip(centers, vals)]
samples += 1
# LED flash 6 Hz
if time.ticks_diff(time.ticks_ms(), next_flash) >= 0:
led.value(not led.value())
next_flash = time.ticks_add(next_flash, 83) # ~83ms
time.sleep_ms(5)
centers = [int(c/samples) for c in centers]
print("Centers:", centers)
# --- Min/Max phase ---
mins = [65535]*len(axes)
maxs = [0]*len(axes)
start = time.ticks_ms()
next_flash = start
while time.ticks_diff(time.ticks_ms(), start) < 5000:
vals = [a.read() for a in axes]
mins = [min(m,v) for m,v in zip(mins, vals)]
maxs = [max(m,v) for m,v in zip(maxs, vals)]
# LED flash 2 Hz
if time.ticks_diff(time.ticks_ms(), next_flash) >= 0:
led.value(not led.value())
next_flash = time.ticks_add(next_flash, 250) # 250ms
time.sleep_ms(5)
print("Min/Max:", list(zip(mins, maxs)))
# Return calibration data
return centers, mins, maxs
def apply_deadzone(norm_val, deadzone=0.05):
if abs(norm_val) < deadzone:
return 0.0
if norm_val > 0:
return (norm_val - deadzone) / (1.0 - deadzone)
else:
return (norm_val + deadzone) / (1.0 - deadzone)
def normalize_axis(raw, center, min_val, max_val):
if raw >= center:
span = max_val - center
if span <= 0:
return 0
norm = (raw - center) / span
else:
span = center - min_val
if span <= 0:
return 0
norm = (raw - center) / span # negative
# Clamp to [-1, 1]
if norm > 1:
norm = 1
if norm < -1:
norm = -1
# Apply deadzone (still in 1…+1)
norm = apply_deadzone(norm)
# Scale to 255…+255 and return integer
return int(norm * 255)
CAL_FILE = "calibration.txt"
def save_calibration(centers, mins, maxs):
with open(CAL_FILE, "w") as f:
# Write as comma-separated values
f.write(",".join(str(v) for v in centers) + "\n")
f.write(",".join(str(v) for v in mins) + "\n")
f.write(",".join(str(v) for v in maxs) + "\n")
def load_calibration():
try:
with open(CAL_FILE, "r") as f:
lines = f.readlines()
centers = [int(v) for v in lines[0].strip().split(",")]
mins = [int(v) for v in lines[1].strip().split(",")]
maxs = [int(v) for v in lines[2].strip().split(",")]
return centers, mins, maxs
except:
return None, None, None
# Globals to hold calibration
cal_centers, cal_mins, cal_maxs = load_calibration()
if cal_centers:
print("Loaded calibration:", cal_centers, cal_mins, cal_maxs)
else:
print("No calibration stored, will use raw values until calibrated")
# ---------------- Setup ----------------
_espnow_init(1, 'RATE_LORA_500K')
peer_mac = load_peer()
if not peer_mac:
enter_pairing()
else:
_espnow_add_peer(peer_mac)
# ---------------- Main loop ----------------
while True:
if btn12.value() == 1 and PAIRMODE_ON_START == 1:
print("PAIRING")
start = time.ticks_ms()
while btn12.value() == 1:
if time.ticks_diff(time.ticks_ms(), start) > 3000:
enter_pairing()
if peer_mac:
_espnow_add_peer(peer_mac)
PAIRMODE_ON_START = 0
break
else:
PAIRMODE_ON_START = 0
if btn5.value() == 1 and CALIBRATE_ON_START == 1:
print("CALIBRATING")
cal_centers, cal_mins, cal_maxs = calibrate()
print(cal_centers, cal_mins, cal_maxs)
save_calibration(cal_centers, cal_mins, cal_maxs)
CALIBRATE_ON_START = 0
else:
CALIBRATE_ON_START = 0
if peer_mac:
if cal_centers and cal_mins and cal_maxs:
# Normalize the four joystick axes
axes_raw = [adc_3.read(), adc_4.read(), adc_8.read(), adc_9.read()]
axes_norm = [
normalize_axis(axes_raw[0], cal_centers[0], cal_mins[0], cal_maxs[0]),
normalize_axis(axes_raw[1], cal_centers[1], cal_mins[1], cal_maxs[1]),
normalize_axis(axes_raw[2], cal_centers[2], cal_mins[2], cal_maxs[2]),
normalize_axis(axes_raw[3], cal_centers[3], cal_mins[3], cal_maxs[3]),
]
else:
# Fallback to raw values if not calibrated
axes_norm = [adc_3.read(), adc_4.read(), adc_8.read(), adc_9.read()]
# Build the packet explicitly
payload = [
axes_norm[0], axes_norm[1], axes_norm[2], axes_norm[3],
adc_1.read(), adc_10.read(),
btn2.value(), btn5.value(), btn6.value(),
btn7.value(), btn11.value(), btn12.value()
]
_espnow_send_peer(peer_mac, payload)
led.value(not led.value())
time.sleep_ms(50)