Implemented the (not yet fully working) communication class. Added error handling to move.py and fixed the node list in planettest.py.

This commit is contained in:
d3rped 2018-03-22 02:14:44 +01:00
parent 7cff115075
commit f023393113
5 changed files with 161 additions and 31 deletions

View file

@ -1,27 +1,153 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# Suggestion: Do not import the ev3dev.ev3 module in this file
class Communication: from planet import Planet, Direction
from enum import Enum
""" """
Class to hold the MQTT client Class to hold the MQTT client
Feel free to add functions, change the constructor and the example send_message() to satisfy your requirements and thereby solve the task according to the specifications Documentation:
On start do the following (make part of __init__):
(1) subscribe to explorer/<N>
(2) publish to explorer/<N>: SYN ready
Next receive the planet name and starting coordinates
(1) receive from explorer/<N>: ACK <Name> <X>,<Y>
(2) subscribe to planet/<Name>
on_message:
Called when a message has been received on a topic that the client subscribes to and the message does not match an existing topic filter callback. Use message_callback_add() to define a callback that will be called for specific topic filters. on_message will serve as fallback when none matched.
client
the client instance for this callback
userdata
the private user data as set in Client() or user_data_set()
message
an instance of MQTTMessage. This is a class with members topic, payload, qos, retain.
mqtt message Documentation:
| Members:
|
| topic : String. topic that the message was published on.
| payload : String/bytes the message payload.
| qos : Integer. The message Quality of Service 0, 1 or 2.
| retain : Boolean. If true, the message is a retained message and not fresh.
| mid : Integer. The message id.
""" """
def __init__(self, mqtt_client, planet):
def str2dir(string):
return {
'N': Direction.NORTH,
'E': Direction.EAST,
'S': Direction.SOUTH,
'W': Direction.WEST
}.get(string)
def dir2str(direction):
return {
Direction.NORTH: 'N',
Direction.EAST: 'E',
Direction.SOUTH: 'S',
Direction.WEST: 'W'
}.get(direction)
def str2tuple(string_a, string_b):
return (eval(string_a), eval(string_b))
def tupel2str(int_a, int_b):
return str(int_a) + ',' + str(int_b)
class Command(str, Enum):
SEND = "SYN " # SYN is short for synchronize
RECEIVE = "ACK " # ACK is short for acknowledge
READY = "ready"
PATH = "path "
TARGET = "target "
COMPLETE = "exploration completed!"
NOTICE = "notice"
class Communication:
""" Initializes communication module, connect to server, subscribe, etc. """ """ Initializes communication module, connect to server, subscribe, etc. """
def __init__(self, mqtt_client):
# THESE TWO VARIABLES MUST NOT BE CHANGED # THESE TWO VARIABLES MUST NOT BE CHANGED
self.msg_queue = []
self.planetname = None
self.planet = Planet()
self.client = mqtt_client self.client = mqtt_client
self.client.on_message = self.on_message self.client.on_message = self.on_message
self.uid = '122'
# ADD YOUR VARIABLES HERE passwd = '7KMuWPT2UE'
self.client.username_pw_set(self.uid, password=passwd)
try:
self.client.connect('robolab.inf.tu-dresden.de', port=8883)
except OSError:
print("ERROR: check your network connection.")
self.subscribetochannel("explorer/" + self.uid)
self.encode_message(Command.READY, None)
self.client.loop_start()
# THIS FUNCTIONS SIGNATURE MUST NOT BE CHANGED # THIS FUNCTIONS SIGNATURE MUST NOT BE CHANGED
def on_message(self, client, data, message):
""" Handles the callback if any message arrived """ """ Handles the callback if any message arrived """
pass def on_message(self, client, data, message):
self.msg_queue.append(message)
# Example def subscribetochannel(self, channelname):
def send_message(self, topic, message): self.client.subscribe(channelname, qos=1)
""" Sends given message to specified channel """
pass def encode_message(self, msgtype, msgdata):
if(msgtype == Command.READY):
print("ENC: Case 1")
self.msg_queue.append(("explorer/" + self.uid, Command.SEND + Command.READY))
if(msgtype == Command.TARGET):
print("ENC: Case 2")
self.msg_queue.append("explorer/" + self.uid, Command.SEND + Command.TARGET + "reached!")
if(msgtype == Command.COMPLETE):
print("ENC: Case 3")
self.msg_queue.append("explorer/" + self.uid, Command.SEND + Command.COMPLETE)
if(msgtype == Command.PATH):
print("ENC: Case 4")
self.msg_queue.append(("planet/" + self.planetname, Command.SEND + Command.PATH + self.pathtostring(msgdata[0], msgdata[1], msgdata[2])))
def process_messages(self):
for messages in self.msg_queue:
if(type(messages) == tuple):
print("Sending Message:", messages)
self.client.publish(messages[0], payload=messages[1], qos=1)
elif(messages.payload.decode('utf-8').startswith(Command.RECEIVE)):
print("Received Message:", messages.payload.decode('utf-8'))
self.comexec(messages.payload.decode('utf-8')[4:])
self.msg_queue.pop(0)
def comexec(self, message):
if not any([message.startswith(instruction) for instruction in Command]): # is planet name and starting position
print("EXEC: Case 1")
[self.planetname, startnode] = message.rsplit(' ')
self.subscribetochannel(self.planetname)
self.planet.setcurnode(str2tuple(*startnode.rsplit(',')))
elif message.startswith(Command.PATH):
print("EXEC: Case 2")
self.stringtopath(*message[5:].rsplit(' '))
elif message.startswith(Command.TARGET):
print("EXEC: Case 3")
self.navto = self.planet.shortest_path(self.planet.getcurnode(), str2tuple(*message[7:].rsplit(',')))
elif message.startswith(Command.NOTICE):
print("EXEC: Case 4")
print(message)
def stringtopath(self, start, target, block, weight):
snode = (str2tuple(*start[:-2].rsplit(',')), str2dir(start[-1:]))
tnode = (str2tuple(*target[:-2].rsplit(',')), str2dir(target[-1:]))
self.planet.add_path(snode, tnode, int(weight))
def pathtostring(self, start, target, blocked):
if(blocked):
path = "blocked"
else:
path = "free"
return tupel2str(*start[0]) + ',' + dir2str(start[1]) + ' ' + tupel2str(*target[0]) + ',' + dir2str(target[1]) + ' ' + path

View file

@ -19,10 +19,6 @@ def run():
clean_session=False, clean_session=False,
protocol=mqtt.MQTTv31) protocol=mqtt.MQTTv31)
# the execution of all code shall be started from within this function
# ADD YOUR OWN IMPLEMENTATION HEREAFTER
print("Hello World!")
# DO NOT EDIT # DO NOT EDIT
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -7,10 +7,13 @@ from sensor import Sensor
class Move: class Move:
def __init__(self, planet): def __init__(self, planet):
try:
self._wheel_l = Wheel('outB') self._wheel_l = Wheel('outB')
self._wheel_r = Wheel('outC') self._wheel_r = Wheel('outC')
self._sensor = Sensor() self._sensor = Sensor()
self._bumper = ev3.TouchSensor() self._bumper = ev3.TouchSensor()
except:
print("ERROR: Cannot find Motor/Sensor")
''' '''
determine maximum and minimum brightness of lines/white space determine maximum and minimum brightness of lines/white space
@ -42,11 +45,8 @@ class Move:
self._wheel_l.run() self._wheel_l.run()
self._wheel_r.run() self._wheel_r.run()
while(self._bumper.value() == False): while(self._bumper.value() == False):
# this does not work... why? self._wheel_l.speed_set(48 - self._sensor.getbrightness())
# self._wheel_l.speed_set(ne) self._wheel_r.speed_set(self._sensor.getbrightness())
# self._wheel_r.speed_set(newspeed)
self._wheel_l._motor.duty_cycle_sp = 48 - self._sensor.getbrightness()
self._wheel_r._motor.duty_cycle_sp = self._sensor.getbrightness()
if(isknownstation == False): if(isknownstation == False):
pass #run odometry stuff here pass #run odometry stuff here
self._wheel_l.stop() self._wheel_l.stop()

View file

@ -50,6 +50,7 @@ class Planet:
def __init__(self): def __init__(self):
""" Initializes the data structure """ """ Initializes the data structure """
self._planetmap = {} self._planetmap = {}
self._curnode = None
self.target = None self.target = None
# Adds a bidirectional path defined between the start and end coordinates to the map and assigns the weight to it. # Adds a bidirectional path defined between the start and end coordinates to the map and assigns the weight to it.
@ -66,6 +67,12 @@ class Planet:
def get_paths(self) -> Dict[Tuple[int, int], Dict[Direction, Tuple[Tuple[int, int], Direction, Weight]]]: def get_paths(self) -> Dict[Tuple[int, int], Dict[Direction, Tuple[Tuple[int, int], Direction, Weight]]]:
return self._planetmap return self._planetmap
def setcurnode(self, node):
self._curnode = node
def getcurnode(self):
return self._curnode
''' '''
Returns a shortest path between two nodes. Returns a shortest path between two nodes.
Used Algorithm: Dijkstra's Algorithm Used Algorithm: Dijkstra's Algorithm

View file

@ -71,6 +71,7 @@ class YourFirstTestPlanet(unittest.TestCase):
self.planet.add_path(((1, 0), Direction.NORTH), ((2, 2), Direction.SOUTH), 2) self.planet.add_path(((1, 0), Direction.NORTH), ((2, 2), Direction.SOUTH), 2)
self.planet.add_path(((1, 0), Direction.EAST), ((3, 0), Direction.WEST), 1) self.planet.add_path(((1, 0), Direction.EAST), ((3, 0), Direction.WEST), 1)
self.planet.add_path(((2, 2), Direction.NORTH), ((2, 3), Direction.SOUTH), 3) self.planet.add_path(((2, 2), Direction.NORTH), ((2, 3), Direction.SOUTH), 3)
self.planet.add_path(((2, 2), Direction.NORTH), ((3, 2), Direction.SOUTH), 1)
self.planet.add_path(((3, 0), Direction.EAST), ((4, 0), Direction.WEST), -1) self.planet.add_path(((3, 0), Direction.EAST), ((4, 0), Direction.WEST), -1)
self.planet.add_path(((3, 2), Direction.NORTH), ((3, 2), Direction.SOUTH), 1) self.planet.add_path(((3, 2), Direction.NORTH), ((3, 2), Direction.SOUTH), 1)