172 lines
4.1 KiB
Python
172 lines
4.1 KiB
Python
import network
|
|
import time
|
|
import socket
|
|
import select
|
|
import neopixel
|
|
from machine import Pin
|
|
|
|
def do_connect(tries=0):
|
|
if tries > 3:
|
|
machine.reset()
|
|
print('Trying to connect to Wifi')
|
|
sta_if = network.WLAN(network.WLAN.IF_STA)
|
|
sta_if.active(True)
|
|
sta_if.disconnect()
|
|
attempts = 0
|
|
while True:
|
|
attempts += 1
|
|
print(' Scan: ', sta_if.scan())
|
|
sta_if.connect("39C3-open", "")
|
|
time.sleep(10)
|
|
if not sta_if.isconnected():
|
|
print(' Failed to connect. Sleeping 5s')
|
|
sta_if.disconnect()
|
|
time.sleep(5)
|
|
if attempts > 3:
|
|
sta_if.active(False)
|
|
do_connect(tries+1)
|
|
else:
|
|
print(sta_if.ifconfig())
|
|
return
|
|
|
|
def do_ap():
|
|
ap_if = network.WLAN(network.WLAN.IF_AP)
|
|
ap_if.active(True)
|
|
ap_if.config(ssid='My AP123', channel=11, key='asdf1234')
|
|
ap_if.ifconfig(('192.168.0.4', '255.255.255.0', '192.168.0.1', '8.8.8.8'))
|
|
print(ap_if.config('ssid'))
|
|
print(ap_if.config('channel'))
|
|
print('AP ready.')
|
|
|
|
|
|
def setup_listener():
|
|
addr = socket.getaddrinfo('0.0.0.0', 1337)[0][-1]
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.bind(addr)
|
|
s.listen(3)
|
|
#s.setblocking(False)
|
|
return s
|
|
|
|
|
|
MOTOR_A0 = Pin(0, Pin.OUT, drive=Pin.DRIVE_0)
|
|
MOTOR_A1 = Pin(1, Pin.OUT, drive=Pin.DRIVE_0)
|
|
MOTOR_B0 = Pin(2, Pin.OUT, drive=Pin.DRIVE_0)
|
|
MOTOR_B1 = Pin(3, Pin.OUT, drive=Pin.DRIVE_0)
|
|
LED_DAT = Pin(6, Pin.OUT, drive=Pin.DRIVE_3)
|
|
LED_NP = neopixel.NeoPixel(LED_DAT, 6, timing=1, bpp=4)
|
|
|
|
|
|
#do_connect()
|
|
do_ap()
|
|
s = setup_listener()
|
|
c = []
|
|
|
|
debug = False
|
|
|
|
def controlloop(last_start, next_action):
|
|
now = time.ticks_us()
|
|
|
|
if debug:
|
|
print('CTRL: ', last_start, next_action)
|
|
|
|
if time.ticks_diff(now, time.ticks_add(last_start, 100000)) < 0:
|
|
if debug:
|
|
print(' DEADTIME -> ignore')
|
|
return last_start
|
|
|
|
if next_action is None and time.ticks_diff(time.ticks_add(last_start, 601000), now) > 0:
|
|
if debug:
|
|
print(' ACTION TIMED OUT -> stopping')
|
|
next_action = (0,0)
|
|
elif next_action is None:
|
|
return last_start
|
|
|
|
if debug:
|
|
print('Starting action: ', next_action)
|
|
|
|
if next_action[0] == 1:
|
|
MOTOR_A0.on()
|
|
MOTOR_A1.off()
|
|
elif next_action[0] == -1:
|
|
MOTOR_A0.off()
|
|
MOTOR_A1.on()
|
|
else:
|
|
MOTOR_A0.off()
|
|
MOTOR_A1.off()
|
|
|
|
if next_action[1] == 1:
|
|
MOTOR_B0.on()
|
|
MOTOR_B1.off()
|
|
elif next_action[1] == -1:
|
|
MOTOR_B0.off()
|
|
MOTOR_B1.on()
|
|
else:
|
|
MOTOR_B0.off()
|
|
MOTOR_B1.off()
|
|
|
|
# we just started new action
|
|
if next_action != (0,0):
|
|
return time.ticks_us()
|
|
else:
|
|
return last_start
|
|
|
|
|
|
def mainloop(l):
|
|
print('Accepting connection')
|
|
(y, origin_sockaddr) = s.accept()
|
|
y.settimeout(0.1)
|
|
y.write("\255\254\34Welcome!\nPress wasd to move!\n>")
|
|
|
|
last_action_start = 0
|
|
ctr = 0
|
|
|
|
next_action = None
|
|
|
|
last_pixel_update = 0
|
|
|
|
#for i in range(6):
|
|
# LED_NP[i] = (255,255,255, 255)
|
|
#LED_NP.write()
|
|
#time.sleep(1)
|
|
|
|
while True:
|
|
ctr += 1
|
|
# run one iteration of control loop
|
|
last_action_start = controlloop(last_action_start, next_action)
|
|
#if time.time() > last_pixel_update + 0.5:
|
|
# #LED_NP[ctr % 6] = (ctr*3 % 255, ctr*2 % 255, (ctr+1)*3 % 255)
|
|
# last_pixel_update = time.time()
|
|
# LED_NP.write()
|
|
|
|
a = None
|
|
try:
|
|
a = y.read(1)
|
|
if debug:
|
|
print('RCV: ', a)
|
|
y.write(a)
|
|
except OSError:
|
|
# timeout
|
|
a = b'x'
|
|
if a == b'':
|
|
# EOF
|
|
y.close()
|
|
mainloop(l)
|
|
elif a == b'w':
|
|
next_action = (1, 1)
|
|
elif a == b'a':
|
|
next_action = (-1, 1)
|
|
elif a == b'd':
|
|
next_action = (1, -1)
|
|
elif a == b's':
|
|
next_action = (-1, -1)
|
|
elif a in [b'\n', b'\r']:
|
|
pass
|
|
else:
|
|
next_action = None
|
|
|
|
|
|
|
|
|
|
mainloop(s)
|
|
|
|
|