Commit ba1f6a9d authored by Julian Rudolf's avatar Julian Rudolf
Browse files

debugging qlearning agent

parent 28b3c51e
from snake_logic import step, reset, render, init_game
from state import State
import pickle
import random
......@@ -8,7 +9,12 @@ class QLearning:
# contains all parameters needed for qlearning
# alpha: learning rate
# gamma: discount factor
# qTable state x action
# qTable: state x action
# rounds: number of rounds to be played
# step_sum: how many times the agent stepped in one round
# reward_sum: sum of rewards in one round
# epsilon: exploration factor
# poss_dirs: possible directions in one step
def __init__(self, alpha_, g_, epsilon_):
self.alpha = alpha_
self.gamma = g_
......@@ -17,7 +23,7 @@ class QLearning:
self.step_sum = 0
self.reward_sum = 0
self.epsilon = epsilon_
self.num_states = 0
self.poss_dirs = 0
# calculates new qTable entry
def update(self, state, next_state, action, reward):
......@@ -34,7 +40,7 @@ class QLearning:
# calculates the max qTable value for a given state
def max_val(self, state):
poss = []
actions = possible_directions(state)
actions = self.possible_directions()
for action in actions:
if (state, action) in self.qTable:
poss.append(self.qTable[(state, action)])
......@@ -66,10 +72,11 @@ class QLearning:
# function to choose which direction to travel
def choose_direction(self, state):
poss_actions = possible_directions(state)
poss_actions = self.possible_directions()
rand_action = random.choice(poss_actions)
best_direction = self.best_direction(state, poss_actions)
self.epsilon = self.epsilon - 0.0001
if random.random() > self.epsilon:
#print("agent chose ", best_direction)
return best_direction
......@@ -81,80 +88,86 @@ class QLearning:
def print_table(self):
dirs = ("left", "right", "up", "down")
print(dirs)
i = 0
while True:
for tuple, value in self.qTable.items():
if tuple[0].id == i:
print(tuple[1], "(", value, ")", " | ", end='')
print(" ")
i += 1
if i > self.num_states:
break
# returns all possible actions
# TODO: maby possible != not kill
def possible_directions(state):
poss_actions = []
if not state.kill_l:
poss_actions.append("left")
if not state.kill_r:
poss_actions.append("right")
if not state.kill_u:
poss_actions.append("up")
if not state.kill_d:
poss_actions.append("down")
return poss_actions
# returns all possible actions
# TODO: maby possible != not kill
def possible_directions(self):
poss_actions = []
# if not state.kill_l:
# poss_actions.append("left")
# if not state.kill_r:
# poss_actions.append("right")
# if not state.kill_u:
# poss_actions.append("up")
# if not state.kill_d:
# poss_actions.append("down")
if self.poss_dirs[0]:
poss_actions.append("left")
if self.poss_dirs[1]:
poss_actions.append("right")
if self.poss_dirs[2]:
poss_actions.append("up")
if self.poss_dirs[3]:
poss_actions.append("down")
return poss_actions
# function to play one game for learning
def play_game_learning(qagent, q_0):
def play_game_learning(qagent, q_0, poss_dirs_0):
state = q_0
qagent.poss_dirs = poss_dirs_0
qagent.step_sum = 0
qagent.reward_sum = 0
# state.id = qagent.step_sum
game_over = False
while not game_over:
action = qagent.choose_direction(state)
next_state, reward, game_over = step(action)
next_state, reward, game_over, poss_dirs = step(action)
qagent.poss_dirs = poss_dirs
qagent.step_sum += 1
qagent.reward_sum += reward
# if next_state.id == -1:
# next_state.id = qagent.num_states + 1
# qagent.num_states = next_state.id
qagent.update(state, next_state, action, reward)
state = next_state
return game_over
# main learning function
def learning():
alpha = 0.1
gamma = 0.5
epsilon = 0.3
max_rounds = 100
qagent = QLearning(alpha, gamma, epsilon)
q_0 = init_game()
def learning(al, ga, ep, games):
max_games = games
score = [0, 0]
qagent = QLearning(al, ga, ep)
q_0, poss_dirs_0 = init_game()
print("Starting learning process!")
for i in range(max_rounds):
play_game_learning(qagent, q_0)
print("Round ", i+1)
print("Reward for this game: ", qagent.reward_sum)
print("Agent stepped ", qagent.step_sum, " times!")
# render()
print("-------------------------------------------")
for i in range(max_games):
playerwin = play_game_learning(qagent, q_0, poss_dirs_0)
print("Game ", i+1, "/", max_games, " finished!")
if playerwin == 1:
score[0] += 1
elif playerwin == 2:
score[1] += 1
# print("Round ", i+1)
# print("Reward for this game: ", qagent.reward_sum)
# print("Agent stepped ", qagent.step_sum, " times!")
# print("Epsilon = ", qagent.epsilon)
# #render()
# print("-------------------------------------------")
# qagent.print_table()
print(qagent.qTable)
return qagent.qTable
# print(qagent.qTable)
print("Score after learing: ", score)
return qagent, q_0, poss_dirs_0
# chooses best direction based on qtable
def choose_best_direction(qtable, state):
dirs = possible_directions(state)
def choose_best_direction(qagent, state):
dirs = qagent.possible_directions()
best_dir = "none"
best_val = -10000
for dir in dirs:
val = qtable[(state, dir)]
if (state, dir) in qagent.qTable:
val = qagent.qTable[(state, dir)]
else:
print("(state, dir) not in qtable")
val = -1000
if val > best_val:
best_val = val
best_dir = dir
......@@ -163,21 +176,75 @@ def choose_best_direction(qtable, state):
# plays one game with the given qtable
def play_game_testing(qtable, q_0):
def play_game_testing(qagent, q_0, poss_dirs_0, rand):
state = q_0
qagent.poss_dirs = poss_dirs_0
game_over = False
while not game_over:
action = choose_best_direction(qtable, state)
if rand:
actions = qagent.possible_directions()
action = random.choice(actions)
else:
action = choose_best_direction(qagent, state)
state, _, game_over, poss_dirs = step(action)
qagent.poss_dirs = poss_dirs
return game_over
# let the agent play against the shield snake to test how the agent performs
def testing(qtable):
max_rounds = 10
q_0 = init_game()
for i in range(max_rounds):
print("Round ", i, " started!")
play_game_testing(qtable, q_0)
# let the agent play against the shield snake to test how the agent performs
def testing(qtable, games, q_0, poss_dirs_0, rand=False):
max_games = games
score = [0, 0]
reset()
if rand:
print("Starting random games!")
else:
print("Starting testing process!")
for i in range(max_games):
playerwin = play_game_testing(qtable, q_0, poss_dirs_0, rand)
print("Testing Game ", i+1, "/", max_games, " finished!")
if playerwin == 1:
score[0] += 1
elif playerwin == 2:
score[1] += 1
if rand:
print("----------------------------------------")
print("Random games finished!")
print("Score:")
print(score)
print("----------------------------------------")
else:
print("----------------------------------------")
print("Learned games finished!")
print("Score:")
print(score)
print("----------------------------------------")
# saves qtable in "qTable.pkl" pickle file
def save_qtable(qtable):
f = open("qTable.pkl", "wb")
pickle.dump(qtable, f)
f.close()
# loads and returns qTable from pickle file
def load_qtable():
f = open("../Agent/qTable.pkl", "rb")
qtable = pickle.load(f)
f.close()
return qtable
alpha = 0.1
gamma = 0.5
epsilon = 0.3
rounds = 800
#agent, q0, poss_dirs0 = learning(alpha, gamma, epsilon, rounds)
#save_qtable(agent.qTable)
#testing(agent, 50, q0, poss_dirs0, True)
#testing(agent, 50, q0, poss_dirs0)
table = learning()
#testing(table)
......@@ -2,6 +2,7 @@ import sys
import random
import Game.maps as maps
import multiprocessing as mp
import pickle
sys.path.append("../Shield")
sys.path.append("../Agent")
......@@ -160,7 +161,7 @@ class Snake:
# returns left, right, up, down
# True if direction is allowed
def get_surround(self):
def get_surround(self, agent=False):
if self.pos[0] - block_size < 0:
left = (map[int(self.pos[1] // path)][int((res_x - block_size) // path)] == 0)
else:
......@@ -177,6 +178,17 @@ class Snake:
down = (map[0][int((self.pos[0]) // path)] == 0)
else:
down = (map[int((self.pos[1] + block_size) // path)][int((self.pos[0]) // path)] == 0)
if agent:
if self.direction == "left":
right = False
elif self.direction == "right":
left = False
elif self.direction == "up":
down = False
elif self.direction == "down":
up = False
return left, right, up, down
# automatically turns the snake if it reaches a corner of crossing
......@@ -465,15 +477,15 @@ def init_game():
# create snakes
snake1 = Snake(startsnake1["pos"], startsnake1["vel"], startsnake1["angle"], 0, act_shield=False, length=snake_length)
snake2 = Snake(startsnake2["pos"], startsnake2["vel"], startsnake2["angle"], 1, act_shield=True, length=snake_length, dir=False)
snake2 = Snake(startsnake2["pos"], startsnake2["vel"], startsnake2["angle"], 1, act_shield=False, length=snake_length, dir=False)
snake1.set_enemy_snake(snake2.shield_snake)
snake2.set_enemy_snake(snake1.shield_snake)
snake1.set_enemy_norm_snake(snake2)
snake2.set_enemy_norm_snake(snake1)
# snake needs to be on crossing for shield
# but step function thinks it needs action on first crossing
state, _, _, = step("init")
return state
state, _, _, poss_dirs = step("init")
return state, poss_dirs
# step function
......@@ -494,29 +506,32 @@ def step(action):
if playerwin != 0:
if playerwin == 1 or (playerwin == 3 and snake1.getscore() > snake2.getscore()):
overall_score[0] += 1
print("-------------------------------------------")
print("Snake 1 wins")
print(overall_score)
print("Apples eaten: ", snake1.getscore())
# print("-------------------------------------------")
# print("Snake 1 wins")
# print(overall_score)
# print("Apples eaten: ", snake1.getscore())
reward += 100
if playerwin == 2 or (playerwin == 3 and snake2.getscore() > snake1.getscore()):
overall_score[1] += 1
print("-------------------------------------------")
print("Snake 2 wins")
print(overall_score)
print("Apples eaten: ", snake1.getscore())
# print("-------------------------------------------")
# print("Snake 2 wins")
# print(overall_score)
# print("Apples eaten: ", snake1.getscore())
reward -= 100
if playerwin == 3 and snake1.getscore() == snake2.getscore():
print("Tie")
# print("Tie")
reward = 0
print("Apples eaten: ", snake1.getscore())
reset()
agent_apples = []
for a, id in apples:
for apple in a:
if id == 0:
agent_apples.append((int(apple.pos[0] / block_size), int(apple.pos[1] / block_size)))
return calc_state_space(snake1, snake2, agent_apples), reward, True
left, right, up, down = snake1.get_surround(True)
poss_dirs = (left, right, up, down)
return calc_state_space(snake1, snake2, agent_apples), reward, playerwin, poss_dirs
# agent controlled snake
if not action_done:
......@@ -544,19 +559,6 @@ def step(action):
if check_if_crossing(snake2.pos[0] / block_size, snake2.pos[1] / block_size):
poss_actions = ("left", "right", "up", "down")
snake2.key_event(random.choice(poss_actions))
# random_choice = random.randint(1,4)
# if random_choice == 1:
# snake2.key_event("left")
# # print("Random snake chose left(" + str(random_choice) + ")")
# elif random_choice == 2:
# snake2.key_event("right")
# # print("Random snake chose right(" + str(random_choice) + ")")
# elif random_choice == 3:
# snake2.key_event("down")
# # print("Random snake chose down(" + str(random_choice) + ")")
# elif random_choice == 4:
# snake2.key_event("up")
# # print("Random snake chose up(" + str(random_choice) + ")")
# determine if a crash happened
crash1 = snake1.update()
......@@ -569,9 +571,9 @@ def step(action):
reward += 10
snake2.eat()
if snake1.getscore() == apple_win_count:
if snake1.getscore() >= apple_win_count:
playerwin = 1
if snake2.getscore() == apple_win_count:
if snake2.getscore() >= apple_win_count:
playerwin = 2
if check_if_crossing(snake1.pos[0] / block_size, snake1.pos[1] / block_size):
......@@ -582,7 +584,9 @@ def step(action):
for apple in a:
if id == 0:
agent_apples.append((int(apple.pos[0] / block_size), int(apple.pos[1] / block_size)))
return calc_state_space(snake1, snake2, agent_apples), reward, False
left, right, up, down = snake1.get_surround(True)
poss_dirs = (left, right, up, down)
return calc_state_space(snake1, snake2, agent_apples), reward, 0, poss_dirs
exit_game()
......@@ -650,7 +654,7 @@ def reset():
snake1 = Snake(startsnake1["pos"], startsnake1["vel"], startsnake1["angle"], 0, act_shield=False,
length=snake_length)
snake2 = Snake(startsnake2["pos"], startsnake2["vel"], startsnake2["angle"], 1, act_shield=True,
snake2 = Snake(startsnake2["pos"], startsnake2["vel"], startsnake2["angle"], 1, act_shield=False,
length=snake_length, dir=False)
snake1.set_enemy_snake(snake2.shield_snake)
snake2.set_enemy_snake(snake1.shield_snake)
......
......@@ -16,7 +16,6 @@ class State:
# describes in which direction a crash would be inevitable
# binary: 0 no crash || 1 crash
def __init__(self, df_l_, df_r_, df_u_, df_d_, kill_l_, kill_r_, kill_u_, kill_d_):
# self.id = -1
self.df_l = df_l_
self.df_r = df_r_
self.df_d = df_d_
......@@ -258,6 +257,7 @@ def calc_state_space(snake_agent, snake_enemy, apples):
return State(dfl, dfr, dfu, dfd, killl, killr, killu, killd)
# head = (5,6)
# enemy = (8,6)
# app = ((17,28), (20,9), (17,29))
......
......@@ -3,10 +3,14 @@ import pygame
import random
import maps
import multiprocessing as mp
import pickle
sys.path.append("../Shield")
sys.path.append("../Agent")
from shield import ShieldSnake, get_shield, find_node, init_shield
from graph_from_map import generate_graph
from Agent.state import calc_state_space, State
from Agent.agent import load_qtable
# -------------------------------------------------------------------
......@@ -114,6 +118,7 @@ startsnake1, startsnake2 = maps.start6
map = maps.map6
snake_length = 15
tmp_snake_length = 15
agent_control = False
apples = [(set([]), green_apple), (set([]), purple_apple)]
bombs = [(set([]), bomb_img)]
......@@ -240,7 +245,7 @@ class Snake:
# returns left, right, up, down
# True if direction is allowed
def get_surround(self):
def get_surround(self, agent=False):
if self.pos[0] - block_size < 0:
left = (map[int(self.pos[1] // path)][int((res_x - block_size) // path)] == 0)
else:
......@@ -257,6 +262,17 @@ class Snake:
down = (map[0][int((self.pos[0]) // path)] == 0)
else:
down = (map[int((self.pos[1] + block_size) // path)][int((self.pos[0]) // path)] == 0)
if agent:
if self.direction == "left":
right = False
elif self.direction == "right":
left = False
elif self.direction == "up":
down = False
elif self.direction == "down":
up = False
return left, right, up, down
# automatically turns the snake if it reaches a corner of crossing
......@@ -489,6 +505,30 @@ class Snake:
else:
self.crash_prob_draw.append((red, [pos[0], pos[1], block_size, block_size]))
# chooses action based on qTable
def choose_best_action(self, qtable, state):
dirs = []
l, r, u, d = self.get_surround(True)
if l: dirs.append("left")
if r: dirs.append("right")
if u: dirs.append("up")
if d: dirs.append("down")
best_dir = "none"
best_val = -10000
#print(dirs)
for dir in dirs:
if (state, dir) in qtable:
#print("Value found in qTable")
val = qtable[(state, dir)]
else:
print("(state, dir) not in qtable")
val = -1000
if val > best_val:
best_val = val
best_dir = dir
return best_dir
# checks if there is a prob != 0.0%
def all_good_choice(crash_prob):
......@@ -632,12 +672,20 @@ def button(text, pos, color1, color2, action, text_color=black):
click = pygame.mouse.get_pressed()
speed_select = ((int(res_x / 2 - 200), res_y - 300, 100, 30), (int(res_x / 2 - 320), res_y - 300, 100, 30), (int(res_x / 2 - 440), res_y - 300, 100, 30))
length_select = ((int(res_x / 2 + 340), res_y - 300, 100, 30), (int(res_x / 2 + 220), res_y - 300, 100, 30), (int(res_x / 2 + 100), res_y - 300, 100, 30))
global agent_control
if pos[0] + pos[2] > cur[0] > pos[0] and pos[1] + pos[3] > cur[1] > pos[1]:
pygame.draw.rect(Display, color2, pos)
if click[0] == 1:
if action == "switch":
return False
elif action == "agent":
clock.tick(6)
if agent_control:
agent_control = False
else:
agent_control = True
clock.tick(6)
elif action == "length10":
pygame.draw.rect(Display, white, length_select[0], 8)
pygame.draw.rect(Display, white, length_select[1], 8)
......@@ -814,7 +862,10 @@ def select_snake():
p2_pos = (p2_pos[0], p2_pos[1] + 330)
p2.move_ip(0, +330)
if p1_pos == p2_pos:
if agent_control:
pygame.draw.rect(Display, white, p2, 5)
pygame.draw.rect(Display, green, p1, 5)
elif p1_pos == p2_pos:
pygame.draw.rect(Display, black, p1, 5)
else:
pygame.draw.rect(Display, green, p1, 5)
......@@ -829,15 +880,19 @@ def select_snake():
elif p1_pos == pos4:
p1_snake = red_head, red, red_start, red_win, red_win_end, red_lost, red_lost_end
if p2_pos == pos1:
p2_snake = green_head, green, green_start, green_win, green_win_end, green_lost, green_lost_end
elif p2_pos == pos2:
if not agent_control:
if p2_pos == pos1:
p2_snake = green_head, green, green_start, green_win, green_win_end, green_lost, green_lost_end
elif p2_pos == pos2:
p2_snake = bot_head, very_light_grey, bot_start, bot_win, bot_win_end, bot_lost, bot_lost_end
elif p2_pos == pos3:
p2_snake = purple_head, purple, purple_start, purple_win, purple_win_end, purple_lost, purple_lost_end
elif p2_pos == pos4:
p2_snake = red_head, red, red_start, red_win, red_win_end, red_lost, red_lost_end
else:
p2_snake = bot_head, very_light_grey, bot_start, bot_win, bot_win_end, bot_lost, bot_lost_end
elif p2_pos == pos3:
p2_snake = purple_head, purple, purple_start, purple_win, purple_win_end, purple_lost, purple_lost_end
elif p2_pos == pos4:
p2_snake = red_head, red, red_start, red_win, red_win_end, red_lost, red_lost_end
button("Play vs AI", (int(res_x / 2 - 60), res_y - 760, 120, 50), grey, light_grey, action="agent")
select = button("Got my Snake", (int(res_x / 2 - 60), res_y - 450, 120, 50), green, light_green, action="select_map")
clock.tick(30)
pygame.display.update()
......@@ -1030,13 +1085,18 @@ def game_loop():
# create snakes
snake1 = Snake(startsnake1["pos"], startsnake1["vel"], startsnake1["angle"], 0, img=p1_snake[0], color=p1_snake[1],
apple_img=green_apple, act_shield=True, length=snake_length)
apple_img=green_apple, act_shield=False, length=snake_length)
snake2 = Snake(startsnake2["pos"], startsnake2["vel"], startsnake2["angle"], 1, img=p2_snake[0], color=p2_snake[1],
apple_img=purple_apple, act_shield=True, length=snake_length, dir=False)
snake1.set_enemy_snake(snake2.shield_snake)
snake2.set_enemy_snake(snake1.shield_snake)
snake1.set_enemy_norm_snake(snake2)
snake2.set_enemy_norm_snake(snake1)
if agent_control:
qtable = load_qtable()
snake2.shield = False
print("Agent got control!")
action_taken = False
# game loop
while not game_exit:
......@@ -1113,14 +1173,16 @@ def game_loop():
elif not snake1.shield:
snake1.shield = True
if event.key == pygame.K_a: # Snake2
snake2.key_event("left")
if event.key == pygame.K_d:
snake2.key_event("right")
if event.key == pygame.K_s:
snake2.key_event("down")
if event.key == pygame.K_w:
snake2.key_event("up")
if not agent_control:
if event.key == pygame.K_a: # Snake2
snake2.key_event("left")
if event.key == pygame.K_d:
snake2.key_event("right")
if event.key == pygame.K_s:
snake2.key_event("down")
if event.key == pygame.K_w:
snake2.key_event("up")