Fixed lots and lots of errors. Added and tweaked some helper functions. Main loop is now partially working.

This commit is contained in:
d3rped 2018-03-26 03:10:27 +02:00
parent c1f68b84e3
commit f30868635b
6 changed files with 109 additions and 39 deletions

View File

@ -2,6 +2,7 @@
from planet import Planet, Direction
from enum import Enum
from time import sleep
"""
Class to hold the MQTT client
@ -99,23 +100,32 @@ class Communication:
self.encode_message(Command.READY, None)
self.client.loop_start()
self._status = Mode.EXPLORE
self.navto = []
def status(self):
return self._status
def update(self, odometry, edges): # TODO: FINISH
if(odometry != None):
self.planet.setcurnode(self.planet.getcurnode() + odometry[0])
if(edges is None and self._status is Mode.EXPLORE):
if(self.planet.unexedge == []):
self.encode_message(Command.COMPLETE, None)
else:
self.navto = next(filter(lambda x: x is not None, map(lambda edge: self.planet.shortest_path(self.planet.getcurnode(), edge), self.planet.unexedge)))
self._status = Mode.GOTOSTATION
elif(edges is None):
pass
if(odometry is not None): # use odometry to send data
self.encode_message(Command.PATH, ((self.planet.getcurnode(), self.planet.getcurdir()), ((self.planet.getcurnode()[0] + odometry[0][0], self.planet.getcurnode()[1] + odometry[0][1]), (odometry[1] - 180) % 360), odometry[2]))
if(self._status is Mode.GOTOSTATION or self._status is Mode.TARGET):
self.encode_message(Command.PATH, ((self.navto[0][0], self.navto[0][1]), (self.planet._planetmap[self.navto[0][0]][self.navto[0][1]][0], self.planet._planetmap[self.navto[0][0]][self.navto[0][1]][1]), self.planet._planetmap[self.navto[0][0]][self.navto[0][1]][2]))
self.navto.pop(0)
self.process_messages() # this should update curnode and curdir
else:
pass
self.process_messages() # this should update curnode and curdir
edges = self.planet.checkedges(edges)
if(edges == [] and self._status is Mode.EXPLORE):
if(self.planet.unexedge == []):
self.encode_message(Command.COMPLETE, None)
else:
self.navto = next(filter(lambda x: x is not None, map(lambda edge: self.planet.shortest_path(self.planet.getcurnode(), edge), self.planet.unexedge)))
self._status = Mode.GOTOSTATION
else:
self.navto = [(self.planet.getcurnode(), edges[0])]
def getdir(self):
return ((self.planet.getcurnode(), self.planet.getcurdir()))
# THIS FUNCTIONS SIGNATURE MUST NOT BE CHANGED
""" Handles the callback if any message arrived """
@ -136,23 +146,29 @@ class Communication:
self.msg_queue.append(("planet/" + self.planetname, Command.SEND + Command.PATH + self.pathtostring(msgdata[0], msgdata[1], msgdata[2])))
def process_messages(self):
for messages in self.msg_queue:
if(type(messages) == tuple):
self.client.publish(messages[0], payload=messages[1], qos=1)
elif(messages.payload.decode('utf-8').startswith(Command.RECEIVE)):
self.comexec(messages.payload.decode('utf-8')[4:])
self.msg_queue.clear()
while(self.msg_queue != []):
for messages in self.msg_queue:
if(type(messages) == tuple):
print("Outgoing Message is:", messages)
self.client.publish(messages[0], payload=messages[1], qos=1)
elif(messages.payload.decode('utf-8').startswith(Command.RECEIVE)):
print("Incoming Message is:", messages.payload.decode('utf-8'))
self.comexec(messages.payload.decode('utf-8')[4:])
self.msg_queue.clear()
sleep(2)
def comexec(self, message):
if not any([message.startswith(instruction) for instruction in Command]): # is planet name and starting position
[self.planetname, startnode] = message.rsplit(' ')
self.subscribetochannel("planet/" + self.planetname)
self.planet.setcurnode(str2tuple(*startnode.rsplit(',')))
# Fake Message to set start as blocked
self.encode_message(Command.PATH, [(self.planet.getcurnode(), Direction.SOUTH), (self.planet.getcurnode(), Direction.SOUTH), -1])
elif message.startswith(Command.PATH):
self.stringtopath(*message[5:].rsplit(' '))
elif message.startswith(Command.TARGET):
self.navto = self.planet.shortest_path(self.planet.getcurnode(), str2tuple(*message[7:].rsplit(',')))
if(self.navto != None):
if(self.navto is not None):
self._status = Mode.TARGET
elif message.startswith(Command.NOTICE):
print(message)
@ -161,9 +177,11 @@ class Communication:
snode = (str2tuple(*start[:-2].rsplit(',')), str2dir(start[-1:]))
tnode = (str2tuple(*target[:-2].rsplit(',')), str2dir(target[-1:]))
self.planet.add_path(snode, tnode, int(weight))
if(snode[0] == self.planet.getcurnode() or self.planet.getcurnode is None):
self.planet.setcurnode(tnode[0])
def pathtostring(self, start, target, blocked):
if(blocked):
if(blocked == -1):
path = "blocked"
else:
path = "free"

View File

@ -4,6 +4,7 @@ import paho.mqtt.client as mqtt
from planet import Direction, Planet
from communication import Mode, Communication
from move import Move
from time import sleep
# DO NOT EDIT
client = None
@ -23,6 +24,17 @@ def run():
while(communication.planetname is None):
communication.process_messages()
# run first time not in loop because odometry data is not relevant
communication.update(None, move.getstationedges()) # TODO: Register in map
move.turnto(communication.navto[0][1]) # TODO: add control mechanism to communication class
sleep(4)
communication.planet.setcurdir(communication.navto[0][1])
move.setcurdir(communication.getdir())
move.reset()
print("current dir:", move._odometry.angle_get())
print("Navigation Data:", communication.navto)
move.traversetonextstation(False)
while(communication.status() is not Mode.COMPLETE):
while(communication.status() is Mode.GOTOSTATION or communication.status() is Mode.TARGET): # should be triggered if current station has no unexplored edges
for instructions in communication.navto:
@ -31,11 +43,15 @@ def run():
move.turnto(instructions[1])
move.traversetonextstation(True)
communication.update(move._odometry.coordinates_get(), move.getstationedges()) # TODO: Register in map
move.updatevars()
communication.update(move.getstats(), move.getstationedges()) # TODO: Register in map
move.turnto(communication.navto[0][1]) # TODO: add control mechanism to communication class
if(communication.check(move.traversetonextstation(False)) is False):
move.setcurdir(communication.getdir())
sleep(4)
communication.planet.setcurdir(communication.navto[0][1])
move.setcurdir(communication.getdir())
move.reset()
print("current dir:", move._odometry.angle_get())
print("Navigation Data:", communication.navto)
move.traversetonextstation(False)
# DO NOT EDIT

View File

@ -38,19 +38,23 @@ class Move:
while(not self._sensor.bumperwaspressed):
self._sensor.checkbumper()
self._sensor.reset()
sleep(1)
self._sensor.calibrateBrightness()
sleep(2)
ev3.Sound.beep()
while(not self._sensor.bumperwaspressed):
self._sensor.checkbumper()
self._sensor.reset()
sleep(1)
self._sensor.calibrateRGB(self._sensor.red)
sleep(2)
ev3.Sound.beep()
while(not self._sensor.bumperwaspressed):
self._sensor.checkbumper()
self._sensor.calibrateRGB(self._sensor.blue)
self._sensor.reset()
sleep(1)
self._sensor.calibrateRGB(self._sensor.blue)
sleep(5)
'''
Function to correct errors should the robot wander astray
@ -66,8 +70,11 @@ class Move:
self._sensor.refresh()
self._wheel_l.stop()
self._wheel_r.stop()
self._wheel_l.turnbyamount(20, 20)
self._wheel_r.turnbyamount(-20, 20)
self._wheel_l.turnbyamount(40, 40)
self._wheel_r.turnbyamount(-40, 40)
def setcurcir(self, curdir):
self.odometry.angle_set(curdir[1])
'''
uses odometry and the planet object to map edges that are connected to
@ -80,22 +87,41 @@ class Move:
found_edges = []
self._odometry.reset()
while(True):
self._wheel_l.turnbyamount(40, 60)
self._wheel_r.turnbyamount(-40, 60)
self._aligntoedge()
self._odometry.pos_update((self._wheel_l.getmovement(), self._wheel_r.getmovement()))
if(self._odometry.angle_get() in found_edges):
break
else:
found_edges.append(self._odometry.angle_get())
self._wheel_l.turnbyamount(20, 20)
self._wheel_r.turnbyamount(-20, 20)
self._odometry.angle_set(self._odometry.angle_get())
self._wheel_l.turnbyamount(40, 60)
self._wheel_r.turnbyamount(-40, 60)
sleep(2)
self._aligntoedge()
self._odometry.pos_update((self._wheel_l.getmovement(), self._wheel_r.getmovement()))
self._odometry.angle_set(self._odometry.angle_get())
return found_edges
def setcurdir(self, newdir):
self._odometry.angle_set(newdir[1])
def getstats(self):
stats = list(self._odometry.coordinates_get())
stats.append(self._sensor.bumperwaspressed)
return stats
def turnto(self, direction):
turnval = self._odometry.dir2ticks(direction)
self._wheel_l.turnbyamount(turnval, 90)
self._wheel_r.turnbyamount(-turnval, 90)
self._wheel_l.turnbyamount(turnval, 120)
self._wheel_r.turnbyamount(-turnval, 120)
sleep(5)
self._odometry.pos_update((self._wheel_l.getmovement(), self._wheel_r.getmovement()))
def reset(self):
self._odometry.reset()
self._sensor.reset()
def traversetonextstation(self, isknownstation):
self._wheel_l.speed_set(self._sensor.edgebrightness//2)
@ -112,6 +138,8 @@ class Move:
self._wheel_r.speed_set(capat100(self._sensor.getbrightness()/2 - self._sensor.edgebrightness/2)+self.defaultspeed)
if(not isknownstation):
self._odometry.pos_update((self._wheel_l.getmovement(), self._wheel_r.getmovement()))
# if() bumper or color
self._wheel_l.stop()
self._wheel_r.stop()
self._wheel_l.turnbyamount(200, 90)
self._wheel_r.turnbyamount(200, 90)
sleep(2)

View File

@ -51,7 +51,7 @@ class Odometry:
self._distance = [0, 0]
self._wheel_center = 0
self._v_length = 0
self._v_angle = Direction.NORTH
self._v_angle = math.radians(Direction.NORTH)
def reset(self):
self._distance = [0, 0]
@ -59,7 +59,7 @@ class Odometry:
self._posxy = [0, 0]
def angle_set(self, newangle):
self._v_angle = newangle
self._v_angle = math.radians(newangle)
def angle_get(self):
return angle2dir(self._v_angle)
@ -74,14 +74,10 @@ class Odometry:
self._v_angle = self._v_angle + (self._distance[0] - self._distance[1])/self._wheel_axis
self._wheel_center = (self._distance[0] + self._distance[1])/2
self.coordinates_update()
print((self._v_length, angle2dir(self._v_angle)))
def coordinates_update(self): # worked as function outside the class
self._posxy[0] = self._posxy[0] + self._wheel_center * math.sin(self._v_angle)
self._posxy[1] = self._posxy[1] + self._wheel_center * math.cos(self._v_angle)
print((self._posxy[0], self._posxy[1]))
def dir2ticks(self, destdir): # return amount of ticks to turn to a given direction (current direction should be _v_angle)
turnval = (destdir - self._v_angle) * 2
self._v_angle = destdir
return turnval
return (destdir - (math.degrees(self._v_angle)%360)) * 2

View File

@ -75,6 +75,9 @@ class Planet:
def getcurnode(self):
return self._curnode
def setcurdir(self, direction):
self._curdir = direction
def getcurdir(self):
return self._curdir
@ -84,6 +87,15 @@ class Planet:
def exploreedge(self, node_edge):
self.unexedge.remove(node_edge)
def checkedges(self, testnode):
if(self._curnode not in self._planetmap):
self._planetmap[self._curnode] = {}
for edges in testnode:
if(edges in self._planetmap[self._curnode]):
testnode.remove(edges)
# maybe do some explored node stuff here?
return testnode
'''
Returns a shortest path between two nodes.
Used Algorithm: Dijkstra's Algorithm

View File

@ -76,6 +76,6 @@ class Sensor:
if (color is self.red):
newRED = (self.RGB[0]-5, self.RGB[1]+5, self.RGB[2]+5)
self.red = newRED
elif (color is self.blueE):
elif (color is self.blue):
newBLUE = (self.RGB[0]+5, self.RGB[1]-5, self.RGB[2]-5)
self.blue = newBLUE