Commit a6731aab authored by Julian Rudolf's avatar Julian Rudolf
Browse files

Merge remote-tracking branch 'origin/dev'

parents cc5ea092 03e08974
from snake_logic import step, reset, render, init_game
from state import State
import pickle
import random
class QLearning:
# contains all parameters needed for qlearning
# alpha: learning rate
# gamma: discount factor
# qTable: state x action
# rounds: number of rounds to be played
# step_sum: how many times the agent stepped in one round
# reward_sum: sum of rewards in one round
# epsilon: exploration factor
# poss_dirs: possible directions in one step
def __init__(self, alpha_, g_, epsilon_):
self.alpha = alpha_
self.gamma = g_
self.qTable = {}
self.rounds = 0
self.step_sum = 0
self.reward_sum = 0
self.epsilon = epsilon_
self.poss_dirs = 0
# calculates new qTable entry
def update(self, state, next_state, action, reward):
old_val = self.qTable[(state, action)]
self.qTable[(state, action)] = (1 - self.alpha) * self.qTable[(state, action)] + \
self.alpha * reward + \
self.alpha * self.gamma * self.max_val(next_state)
# calculates the max qTable value for a given state
def max_val(self, state):
poss = []
actions = self.possible_directions()
for action in actions:
if (state, action) in self.qTable:
poss.append(self.qTable[(state, action)])
else:
poss.append(0)
return max(poss)
# function to determine best direction to travel and assign 0 to qtable entry if not present yet
def best_direction(self, state, poss_actions):
best_val = -100000
best_dir = "none"
for dir in poss_actions:
if (state, dir) not in self.qTable:
self.qTable[(state, dir)] = 0
val = self.qTable[(state, dir)]
#print(dir, " -> ", val)
if val > best_val:
best_dir = dir
best_val = val
# print("dfl, dfr, dfu, dfd, killl, killr, killu, killd")
# print(state.df_l, state.df_r, state.df_u, state.df_d)#, " ", state.kill_l, state.kill_r, state.kill_u, state.kill_d)
# print("Action choosen: ", best_dir)
# print(self.epsilon)
# print("-----------------------------------------------------")
return best_dir
# function to choose which direction to travel
def choose_direction(self, state):
poss_actions = self.possible_directions()
rand_action = random.choice(poss_actions)
best_direction = self.best_direction(state, poss_actions)
self.epsilon = self.epsilon - 0.00005
if random.random() > self.epsilon:
#print("agent chose ", best_direction)
return best_direction
else:
#print("exploration ", rand_action)
return rand_action
# returns all possible actions
def possible_directions(self):
poss_actions = []
# if not state.kill_l:
# poss_actions.append("left")
# if not state.kill_r:
# poss_actions.append("right")
# if not state.kill_u:
# poss_actions.append("up")
# if not state.kill_d:
# poss_actions.append("down")
if self.poss_dirs[0]:
poss_actions.append("left")
if self.poss_dirs[1]:
poss_actions.append("right")
if self.poss_dirs[2]:
poss_actions.append("up")
if self.poss_dirs[3]:
poss_actions.append("down")
return poss_actions
# function to play one game for learning
def play_game_learning(qagent, q_0, poss_dirs_0):
state = q_0
qagent.poss_dirs = poss_dirs_0
qagent.step_sum = 0
qagent.reward_sum = 0
game_over = False
while not game_over:
action = qagent.choose_direction(state)
next_state, reward, game_over, poss_dirs = step(action)
qagent.poss_dirs = poss_dirs
qagent.step_sum += 1
qagent.reward_sum += reward
qagent.update(state, next_state, action, reward)
state = next_state
return game_over
# main learning function
def learning(al, ga, ep, games):
max_games = games
score = [0, 0]
qagent = QLearning(al, ga, ep)
q_0, poss_dirs_0 = init_game()
print("Starting learning process!")
for i in range(max_games):
playerwin = play_game_learning(qagent, q_0, poss_dirs_0)
q_0, poss_dirs_0 = reset()
print("Game ", i+1, "/", max_games, " finished!")
if playerwin == 1:
score[0] += 1
elif playerwin == 2:
score[1] += 1
# print("Round ", i+1)
# print("Reward for this game: ", qagent.reward_sum)
# print("Agent stepped ", qagent.step_sum, " times!")
# print("Epsilon = ", qagent.epsilon)
# #render()
# print("-------------------------------------------")
print("Score after learing: ", score)
return qagent, q_0, poss_dirs_0
# chooses best direction based on qtable
def choose_best_direction(qagent, state):
dirs = qagent.possible_directions()
best_dir = "none"
best_val = -100000
for dir in dirs:
if (state, dir) in qagent.qTable:
val = qagent.qTable[(state, dir)]
if val > best_val:
best_val = val
best_dir = dir
else:
print("(state, dir) not in qtable")
if best_val == -100000:
return random.choice(dirs)
return best_dir
# plays one game with the given qtable
def play_game_testing(qagent, q_0, poss_dirs_0, rand):
state = q_0
qagent.poss_dirs = poss_dirs_0
game_over = False
while not game_over:
if rand:
actions = qagent.possible_directions()
action = random.choice(actions)
else:
action = choose_best_direction(qagent, state)
state, _, game_over, poss_dirs = step(action)
qagent.poss_dirs = poss_dirs
return game_over
# let the agent play against the shield snake to test how the agent performs
def testing(qtable, games, q_0, poss_dirs_0, rand=False):
max_games = games
score = [0, 0]
reset()
if rand:
print("Starting random games!")
else:
print("Starting testing process!")
for i in range(max_games):
playerwin = play_game_testing(qtable, q_0, poss_dirs_0, rand)
q_0, poss_dirs_0 = reset()
print("Testing Game ", i+1, "/", max_games, " finished!")
if playerwin == 1:
score[0] += 1
elif playerwin == 2:
score[1] += 1
if rand:
print("----------------------------------------")
print("Random games finished!")
print("Score:")
print(score)
print("----------------------------------------")
else:
print("----------------------------------------")
print("Learned games finished!")
print("Score:")
print(score)
print("----------------------------------------")
# saves qtable in "qTable.pkl" pickle file
def save_qtable(qtable):
f = open("qTable.pkl", "wb")
pickle.dump(qtable, f)
f.close()
# loads and returns qTable from pickle file
def load_qtable():
f = open("../Agent/qTable.pkl", "rb")
qtable = pickle.load(f)
f.close()
return qtable
alpha = 0.1
gamma = 0.5
epsilon = 0.6
rounds = 800
#agent, q0, poss_dirs0 = learning(alpha, gamma, epsilon, rounds)
#save_qtable(agent.qTable)
#testing(agent, 50, q0, poss_dirs0, True)
#testing(agent, 50, q0, poss_dirs0)
This diff is collapsed.
import random
import Game.maps as maps
map = maps.map3
block_size = 30
head_e = 0
dir_e = 0
class State:
# parameters:
# df_l, df_r, df_u, df_d :
# ranks in which direction (left, right, down, up) the nearest apple is
# from 0 (nearest) to 3 (furthest)
# kill_l, kill_r, kill_u, kill_d :
# describes in which direction a crash would be inevitable
# binary: 0 no crash || 1 crash
def __init__(self, df_l_, df_r_, df_u_, df_d_):
self.df_l = df_l_
self.df_r = df_r_
self.df_d = df_d_
self.df_u = df_u_
def __hash__(self):
return hash((self.df_l, self.df_r, self.df_d, self.df_u))
def __eq__(self, other):
return (self.df_l, self.df_r, self.df_d, self.df_u) == (other.df_l, other.df_r, other.df_d, other.df_u)
# checks which directions are possible
def get_surround(x, y):
l, r, u, d = 0, 0, 0, 0
x = int(x)
y = int(y)
if map[y][x] == 1:
assert False, 'position not path'
if x < 29:
if map[y][x+1] == 0:
r = 1
elif x == 29:
r = 1
if y < 29:
if map[y+1][x] == 0:
d = 1
elif y == 29:
d = 1
if x > 0:
if map[y][x-1] == 0:
l = 1
elif x == 0:
l = 1
if y > 0:
if map[y-1][x] == 0:
u = 1
elif y == 0:
u = 1
return l, r, u, d
# searches recursive for apples in one direction and returns steps
# saves the steps traveled in a global var if smaller
# if traveling longer then max_steps, abort because smallest amount of steps is needed
def search(dir, head, apples, steps, max_steps=50, kill=0, crossing=False):
cont = True
x = head[1]
y = head[0]
if dir == "up":
while cont:
if y == 0:
y = 29
else:
y -= 1
steps += 1
if head_e == (y, x) and dir_e == "down" and not crossing:
kill = 1
for pos in apples:
if pos == (x, y):
max_steps = steps
return max_steps, kill
if steps > max_steps:
return max_steps, kill
l, r, u, d = get_surround(x, y)
if l+r+u+d > 2:
crossing = True
if u == 0:
cont = False
if l == 1:
max_steps, kill = search("left", (y, x), apples, steps, max_steps, kill, crossing)
if r == 1:
max_steps, kill = search("right", (y, x), apples, steps, max_steps, kill, crossing)
elif dir == "down":
while cont:
if y == 29:
y = 0
else:
y += 1
steps += 1
if head_e == (y, x) and dir_e == "up" and not crossing:
kill = 1
for pos in apples:
if pos == (x, y):
max_steps = steps
return max_steps, kill
if steps > max_steps:
return max_steps, kill
l, r, u, d = get_surround(x, y)
if l+r+u+d > 2:
crossing = True
if d == 0:
cont = False
if l == 1:
max_steps, kill = search("left", (y, x), apples, steps, max_steps, kill, crossing)
if r == 1:
max_steps, kill = search("right", (y, x), apples, steps, max_steps, kill, crossing)
elif dir == "left":
while cont:
if x == 0:
x = 29
else:
x -= 1
steps += 1
if head_e == (y, x) and dir_e == "right" and not crossing:
kill = 1
for pos in apples:
if pos == (x, y):
max_steps = steps
return max_steps, kill
if steps > max_steps:
return max_steps, kill
l, r, u, d = get_surround(x, y)
if l+r+u+d > 2:
crossing = True
if l == 0:
cont = False
if d == 1:
max_steps, kill = search("down", (y, x), apples, steps, max_steps, kill, crossing)
if u == 1:
max_steps, kill = search("up", (y, x), apples, steps, max_steps, kill, crossing)
elif dir == "right":
while cont:
if x == 29:
x = 0
else:
x += 1
steps += 1
if head_e == (y, x) and dir_e == "left" and not crossing:
kill = 1
for pos in apples:
if pos == (x, y):
max_steps = steps
return max_steps, kill
if steps > max_steps:
return max_steps, kill
l, r, u, d = get_surround(x, y)
if l+r+u+d > 2:
crossing = True
if r == 0:
cont = False
if d == 1:
max_steps, kill = search("down", (y, x), apples, steps, max_steps, kill, crossing)
if u == 1:
max_steps, kill = search("up", (y, x), apples, steps, max_steps, kill, crossing)
else:
assert False, 'direction unknown'
return max_steps, kill
# calculates nearest apple from all directions
# determines if enemy snake travels towards agent snake on neighboring edges
# returns a Feature object
def calc_state_space(snake_agent, snake_enemy, apples):
global head_e
global dir_e
head = snake_agent.pos[1] / block_size, snake_agent.pos[0] / block_size
dir = snake_agent.direction
head_e = snake_enemy.pos[1] / block_size, snake_enemy.pos[0] / block_size
dir_e = snake_enemy.direction
# head = snake_agent
# dir = "left"
# head_e = snake_enemy
# dir_e = "up"
dfl, dfr, dfu, dfd = -1, -1, -1, -1
sl, sr, su, sd = 100, 101, 102, 103
l, r, u, d = get_surround(head[1], head[0])
if dir == "up":
sd = 103
killd = 1
if l: sl, killl = search("left", head, apples, 0)
else: killl = 1
if r: sr, killr = search("right", head, apples, 0)
else: killr = 1
if u: su, killu = search("up", head, apples, 0)
else: killu = 1
elif dir == "down":
su = 102
killu = 1
if l: sl, killl = search("left", head, apples, 0)
else: killl = 1
if r: sr, killr = search("right", head, apples, 0)
else: killr = 1
if d: sd, killd = search("down", head, apples, 0)
else: killd = 1
elif dir == "left":
sr = 101
killr = 1
if l: sl, killl = search("left", head, apples, 0)
else: killl = 1
if d: sd, killd = search("down", head, apples, 0)
else: killd = 1
if u: su, killu = search("up", head, apples, 0)
else: killu = 1
elif dir == "right":
sl = 100
killl = 1
if d: sd, killd = search("down", head, apples, 0)
else: killd = 1
if r: sr, killr = search("right", head, apples, 0)
else: killr = 1
if u: su, killu = search("up", head, apples, 0)
else: killu = 1
else:
assert False, 'direction unknown'
dirs = sorted([sl, sd, sr, su])
for i in range(4):
if dirs[i] == sl and dfl == -1:
dfl = i
elif dirs[i] == sr and dfr == -1:
dfr = i
elif dirs[i] == sd and dfd == -1:
dfd = i
elif dirs[i] == su and dfu == -1:
dfu = i
# print("Number of steps in each direction:")
# print("l, r, u, d")
# print(str(sl) + ", " + str(sr) + ", " + str(su) + ", " + str(sd))
# print(dir)
# print("Ranking:")
# print(str(dfl) + ", " + str(dfr) + ", " + str(dfu) + ", " + str(dfd))
# print("Kill:")
# print(killl, killr, killu, killd)
if dfl == -1 or dfd == -1 or dfu == -1 or dfr == -1:
assert False, 'directions were not set'
return State(dfl, dfr, dfu, dfd)
# head = (5,6)
# enemy = (8,6)
# app = ((17,28), (20,9), (17,29))
# f = calc_state_space(head, enemy, app)
# print("dfl, dfr, dfu, dfd, killl, killr, killu, killd")
# print(f.df_l, f.df_r, f.df_u, f.df_d, " ", f.kill_l, f.kill_r, f.kill_u, f.kill_d)
from Agent.snake_logic import init_game, reset, step
import os
import sys
# calculates mean and worst case over 200 values
def calculate_times(filename):
time_sum = 0
time_file = open("../Experiments/" + filename, "r")
times = (time_file.readlines())[:200]
worst_case = -1
for time in times:
time = float(time)
time_sum += time
if time > worst_case:
worst_case = time
mean_time = time_sum / len(times)
return mean_time, worst_case
# runs one experiment
# max_steps : for how many steps the shield calculates crash probabilities
# length : length of the snake
# source_filename : filename of times file in which mean and worst case is calculated
# target_filename : filename of final times file in which all means and worst cases are saved
def run_experiment(max_steps, length, source_filename, target_filename):
init_game(int(max_steps), int(length), True, source_filename)
for i in range(200):
game_over = step("left", True)
if game_over:
reset(True, int(length))
mean, worst_case = calculate_times(source_filename)
string = (str(mean)).ljust(25) + " " + str(worst_case) + "\n"
target_file = open(target_filename, "a")
target_file.write(string)
# run_experiment(30, 10, "test.txt", "test2.txt")
if __name__ == '__main__':
globals()[sys.argv[1]](sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5])
# In this shell script, experiments are run and saved in TARGET_FILE
# in order to run this shell the PYTHONPATH needs to be set to 2psnake directory
# To run an experiment, the python function "run_experiment" in /Experiments/experiment.py is called
# run_experiment plays the snake game with one shield activated and random actions
# Parameters:
# max_steps : for how many steps the SHIELD calculates the corresponding crash probabilities
# length : the length of the snakes
# source_filename : file where the game saves calculated times
# target_filename : file where the experiment handler saves mean and worst_case
#
# Experiments:
# Time for SHIELD to calculate probabilities in process time (mean and worst case)
#
export PYTHONPATH='/home/julian/PycharmProjects/2psnake'
TARGET_FILE="experiments.txt"
SOURCE_FILE="times.txt"
# removing old experiment files
rm $TARGET_FILE
rm $SOURCE_FILE
echo "Mean Worst-Case" >> $TARGET_FILE
echo "-------------------------------------------" >> $TARGET_FILE
echo "Starting experiments!"
for i in 10 15 20
do
echo "Snake Length: " $i >> $TARGET_FILE
for j in {10..30}
do
echo "Snake length: " $i " max_steps: " $j " calculating ..."
python experiment.py run_experiment $j $i $SOURCE_FILE $TARGET_FILE
rm $SOURCE_FILE
done
done
......@@ -3,10 +3,14 @@ import pygame
import random
import maps
import multiprocessing as mp
import pickle
sys.path.append("../Shield")
sys.path.append("../Agent")
from shield import ShieldSnake, get_shield, find_node, init_shield
from graph_from_map import generate_graph
from Agent.state import calc_state_space, State
from Agent.agent import load_qtable
# -------------------------------------------------------------------
......@@ -114,6 +118,7 @@ startsnake1, startsnake2 = maps.start6
map = maps.map6
snake_length = 15
tmp_snake_length = 15
agent_control = False
apples = [(set([]), green_apple), (set([]), purple_apple)]
#bombs = [(set([]), bomb_img)]
......@@ -138,6 +143,7 @@ class Snake:
# direction : direction of the snake at the crossing
# shield_snake : the player snake in shield logic
# enemy_snake : the enemy snake in shield logic
# enemy_norm_snake : the enemy snake object