Commit 31699b20 authored by Julian Rudolf's avatar Julian Rudolf
Browse files

started implementing functions for automated experiments

parent 583c7dc5
......@@ -46,7 +46,6 @@ map_filename = '../Shield/maps/map3.pickle'
startsnake1, startsnake2 = maps.start3
map = maps.map3
snake_length = 15
tmp_snake_length = 15
apples = [(set([]), 0), (set([]), 1)]
overall_score = [0, 0]
......@@ -456,7 +455,7 @@ def gen_rand_apple(id):
# initalizes game
def init_game():
def init_game(max_steps=15, length=15, experiment=False, filename=False):
global snake1, snake2
# set apples on the map
for a, id in apples:
......@@ -465,19 +464,20 @@ def init_game():
a.add(apple)
# initialize shield
init_shield(map_filename)
init_shield(map_filename, max_steps, filename)
# modify start pos
j = 20 - snake_length
j = len(startsnake1["pos"]) - length
for i in range(j):
startsnake1["pos"].pop()
startsnake1["vel"].pop()
startsnake2["pos"].pop()
startsnake2["vel"].pop()
print(len(startsnake1["pos"]))
print(length)
# create snakes
snake1 = Snake(startsnake1["pos"], startsnake1["vel"], startsnake1["angle"], 0, act_shield=False, length=snake_length)
snake2 = Snake(startsnake2["pos"], startsnake2["vel"], startsnake2["angle"], 1, act_shield=False, length=snake_length, dir=False)
snake1 = Snake(startsnake1["pos"], startsnake1["vel"], startsnake1["angle"], 0, act_shield=experiment, length=length)
snake2 = Snake(startsnake2["pos"], startsnake2["vel"], startsnake2["angle"], 1, act_shield=False, length=length, dir=False)
snake1.set_enemy_snake(snake2.shield_snake)
snake2.set_enemy_snake(snake1.shield_snake)
snake1.set_enemy_norm_snake(snake2)
......@@ -490,15 +490,15 @@ def init_game():
# step function
# inputs action and steps snake to next crossing
# reward +10 if apple eaten
# returns state, reward and if game is over
# pro step -1 reward, win +100, loose -100, apple +10
# print: win/loose, reward, steps
def step(action):
# if experiment, step function is used only for playing the game without ui
# returns True if game over, else False
def step(action, experiment=False):
global snake1, snake2
game_exit = False
playerwin = 0
action_done = False
init = False
reward = 0
while not game_exit:
......@@ -520,41 +520,46 @@ def step(action):
# print("Tie")
reward = 0
print("Apples eaten: ", snake1.getscore())
agent_apples = []
for a, id in apples:
for apple in a:
if id == 0:
agent_apples.append((int(apple.pos[0] / block_size), int(apple.pos[1] / block_size)))
left, right, up, down = snake1.get_surround(True)
poss_dirs = (left, right, up, down)
return calc_state_space(snake1, snake2, agent_apples), reward, playerwin, poss_dirs
if not experiment:
agent_apples = []
for a, id in apples:
for apple in a:
if id == 0:
agent_apples.append((int(apple.pos[0] / block_size), int(apple.pos[1] / block_size)))
left, right, up, down = snake1.get_surround(True)
poss_dirs = (left, right, up, down)
return calc_state_space(snake1, snake2, agent_apples), reward, playerwin, poss_dirs
else:
return True
poss_actions = ("left", "right", "up", "down")
# agent controlled snake
if not action_done:
# print("direction: " + snake1.direction)
if action == "left":
snake1.key_event(action)
# print("Agent chose " + action)
elif action == "right":
snake1.key_event(action)
# print("Agent chose " + action)
elif action == "down":
snake1.key_event(action)
# print("Agent chose " + action)
elif action == "up":
snake1.key_event(action)
# print("Agent chose " + action)
elif action == "init":
init = True
if not experiment:
# print("direction: " + snake1.direction)
if action == "left":
snake1.key_event(action)
# print("Agent chose " + action)
elif action == "right":
snake1.key_event(action)
# print("Agent chose " + action)
elif action == "down":
snake1.key_event(action)
# print("Agent chose " + action)
elif action == "up":
snake1.key_event(action)
# print("Agent chose " + action)
elif action == "init":
init = True
else:
print(action)
assert False, 'action not known'
else:
print(action)
assert False, 'action not known'
snake1.key_event(random.choice(poss_actions))
action_done = True
# snake 2 acts random
if check_if_crossing(snake2.pos[0] / block_size, snake2.pos[1] / block_size):
poss_actions = ("left", "right", "up", "down")
snake2.key_event(random.choice(poss_actions))
# determine if a crash happened
......@@ -574,14 +579,17 @@ def step(action):
playerwin = 2
if check_if_crossing(snake1.pos[0] / block_size, snake1.pos[1] / block_size):
agent_apples = []
for a, id in apples:
for apple in a:
if id == 0:
agent_apples.append((int(apple.pos[0] / block_size), int(apple.pos[1] / block_size)))
left, right, up, down = snake1.get_surround(True)
poss_dirs = (left, right, up, down)
return calc_state_space(snake1, snake2, agent_apples), reward, 0, poss_dirs
if not experiment:
agent_apples = []
for a, id in apples:
for apple in a:
if id == 0:
agent_apples.append((int(apple.pos[0] / block_size), int(apple.pos[1] / block_size)))
left, right, up, down = snake1.get_surround(True)
poss_dirs = (left, right, up, down)
return calc_state_space(snake1, snake2, agent_apples), reward, 0, poss_dirs
else:
return False
exit_game()
......@@ -635,7 +643,7 @@ def render():
print("")
def reset():
def reset(experiment=False, length=15):
global rounds
global apple_count
global snake1, snake2
......@@ -647,10 +655,10 @@ def reset():
apple = gen_rand_apple(id)
a.add(apple)
snake1 = Snake(startsnake1["pos"], startsnake1["vel"], startsnake1["angle"], 0, act_shield=False,
length=snake_length)
snake1 = Snake(startsnake1["pos"], startsnake1["vel"], startsnake1["angle"], 0, act_shield=experiment,
length=length)
snake2 = Snake(startsnake2["pos"], startsnake2["vel"], startsnake2["angle"], 1, act_shield=False,
length=snake_length, dir=False)
length=length, dir=False)
snake1.set_enemy_snake(snake2.shield_snake)
snake2.set_enemy_snake(snake1.shield_snake)
snake1.set_enemy_norm_snake(snake2)
......@@ -665,6 +673,3 @@ def reset():
def exit_game():
quit()
......@@ -143,6 +143,7 @@ class Snake:
# direction : direction of the snake at the crossing
# shield_snake : the player snake in shield logic
# enemy_snake : the enemy snake in shield logic
# enemy_norm_snake : the enemy snake object
def __init__(self, pos, vel, angle, id, img=green_head, color=green, apple_img=green_apple, act_shield=False, length=snake_length, dir=True):
self.pos = [pos[0][1] * block_size, pos[0][0] * block_size]
self.vel = [vel[0][0] * block_size, vel[0][1] * block_size]
......@@ -431,7 +432,7 @@ class Snake:
except AttributeError:
choice = True
if not choice:
if not choice and False:
if check_if_after_crossing(self.enemy_norm_snake.pos[1] / block_size,
self.enemy_norm_snake.pos[0] / block_size,
self.enemy_norm_snake.direction):
......@@ -442,19 +443,12 @@ class Snake:
self.process = mp.Process(target=get_shield,
args=(self.crash_prob_queue, ps, es, self.count, True))
self.process.start()
# print("ps: nodes " + str(self.shield_snake.nodes) + " edges " + str(self.shield_snake.edges) + ' dist ' + str(
# self.shield_snake.dist) + '\n' + 'es: nodes ' + str(self.enemy_snake.nodes) + " edges " + str(
# self.enemy_snake.edges) + 'dist ' + str(self.enemy_snake.dist) + '\n' + 'process ' + str(
# self.process.pid) + " started snake: " + str(self.id))
# print("pos ps: " + str([[posy // block_size, posx // block_size] for posx, posy in reversed(self.poslist)]))
# print("pos es: " + str([[posy // block_size, posx // block_size] for posx, posy in reversed(self.enemy_norm_snake.poslist)]))
if self.shield and self.process is None and self.shield_snake.shield_calc_dist():
ps = self.shield_snake.copy()
es = self.enemy_snake.copy()
self.process = mp.Process(target=get_shield, args=(self.crash_prob_queue, ps, es))
self.process.start()
# print("normal_print ps: nodes " + str(self.shield_snake.nodes) + " edges " + str(self.shield_snake.edges) + ' dist ' + str(self.shield_snake.dist) +'\n' + 'es: nodes ' + str(self.enemy_snake.nodes) + " edges " + str(self.enemy_snake.edges) + 'dist ' + str(self.enemy_snake.dist) + '\n' + 'process ' + str(self.process.pid) + " started snake: " + str(self.id))
self.wrap_around()
......@@ -1094,7 +1088,6 @@ def game_loop():
snake2.set_enemy_norm_snake(snake1)
if agent_control:
qtable = load_qtable()
snake2.shield = False
print("Agent got control!")
action_taken = False
......
......@@ -7,6 +7,7 @@ from shutil import copyfile
import multiprocessing
import stormpy
import uuid
import time
# -------------------------------------------------------------------
# shield.py
......@@ -36,11 +37,15 @@ map, edges, max, node_name_max, edge_name_max, dist_max, max_steps = None, None,
# minimal precalculation distance
shield_calc_dist_min = 7
times_filename = False
# initializes all the global parameters
# parameters:
# filename : name of file that has been created by graph_from_map
def init_shield(filename):
# steps : sets max_steps, explained above
# t_filename : name of file in with shield calculating times are saved
def init_shield(filename, steps=15, t_filename=False):
global map
global edges
global max
......@@ -48,10 +53,12 @@ def init_shield(filename):
global edge_name_max
global dist_max
global max_steps
global times_filename
map, edges, max, node_name_max, edge_name_max, dist_max = pickle.load(open(filename, 'rb'))
max_steps = 15
max_steps = steps
map = np.array(map)
max += 1
times_filename = t_filename
# creates the beginning of the prism mdp
......@@ -562,7 +569,6 @@ def check_files_prism_process_funct(file, dir, return_dict):
return_dict[dir] = result.at(initial_state)
# checks the prism files for the Pmin probability of a crash
# uses multiprocessing, one for each direction
# parameters:
......@@ -611,6 +617,7 @@ def remove_files(files, master_filename='master.nm'):
# return:
# crash_prob : probability of a crash for each possible direction
def get_shield(crash_prob_queue, snake_player, snake_enemy, count=0, recalc=False, p_move=0):
shield_start = time.process_time()
# master file name
master_filename = 'master_' + str(uuid.uuid4()) + '.nm'
......@@ -659,6 +666,13 @@ def get_shield(crash_prob_queue, snake_player, snake_enemy, count=0, recalc=Fals
# return the crash probabilities (multiprocessing)
crash_prob_queue.put(crash_prob)
shield_end = time.process_time()
total_time = shield_end - shield_start
if times_filename:
times_file = open("../Experiments/" + times_filename, "a")
times_file.write(str(total_time) + "\n")
return crash_prob
# example:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment