글 수 22

텐서플로우에서 DQN 강화학습으로 캐치게임 구현

조회 수 1854 추천 수 0 2017.03.18 13:33:56


TensorFlowPlayCatchGame.gif

http://solarisailab.com/archives/486

 

 

 

강화학습의 한 방법인 DQN(Deep Q-Network)는 기존 Q-Learning에 신경망을 결합한 것입니다. Q-Learning은 상태집합에서 어떤 행동을 해야할지를 학습하는 알고리즘입니다.

 

현재 상태 s에서 행동 a를 했을때의 이득값 Q(s, a)을 알 수 있다면 모든 행동 a에 대해서 가장 높은 Q값을 갖는 행동을 수행하면 됩니다. Q(s, a)함수는 보통 테이블로 이루어져 있는데 (상태 x 행동) 배열이 바로 Q값을 나타냅니다. 예를 들어 위의 캐치게임의 Q테이블은 다음과 같이 정의할 수 있습니다.

 

과일 X좌표 * 과일 Y좌표 * 바구니 X좌표 * 바구니 행동(왼쪽, 대기, 오른쪽)

=> 10 * 10 * 10 * 3 = 3000

 

 

 

주어진 환경에서 반복적으로 동작을 하면서 아래와 같은 수식으로 Q값을 조정합니다.

 

figure3.jpg

 

r은 보상값(reward)이고 γ는 감소값(discount factor, 0~1)입니다. Q값은 보상값 r 더하기 다음에 할 수 있는 행동들의 Q값중 가장 높은 것에 감소값을 곱한값입니다.

 

이렇게 보상값이 점점 주변 상태로 조금씩 감소되면서 퍼져나갑니다. 그렇기 때문에 멀리 떨어진 상태에서도 목표를 향해 순차적으로 올바른 행동을 결정할 수 있습니다.

 

 

 

figure2.jpg

 

위의 Q테이블에서 화살표는 행동을 나타내는데 실제로는 이것도 배열로 표현됩니다. 여기서 목표 G에 도착하면 100의 보상값을 받습니다.

 

G의 왼쪽 상태에서 Q값은 0(보상값) + 0.9(감소값) * 100(행동중 가장 높은 Q값)으로 90입니다. G의 가장 왼쪽 상태에서 Q값은 0(보상값) + 0.9(감소값) * 90(행동중 가장 높은 Q값)으로 81입니다.

 

여러번 행동을 수행할수록 모든 상태에 대해서 Q테이블이 채워지게 됩니다. 그렇게 되면 오른쪽 그림처럼 각 상태마다 목표를 향한 최적의 행동을 결정할 수 있습니다.

 

 

 

캐치게임의 상태를 각 요소별이 아니라 화면의 픽셀정보만 사용해서 표현할 수도 있습니다.

 

픽셀당 상태 ^ (픽셀의 X좌표 * 픽셀의 Y좌표) * 바구니 행동 = 2 ^ (10 * 10) * 3

=> 3802951800684688204490109616128

 

보시다시피 상태의 개수가 엄청나기 때문에 배열로는 구현하기가 힘듭니다. 이처럼 상태집합이 클 경우에는 신경망을 사용할 수 있는데 그중에서도 딥러닝으로 Q테이블을 구하는 방법이 DQN입니다.

 

 

 

96f1ced9cae8f1ade00e311a2300687c.png

 

게임에서 픽셀정보를 입력으로 받아 딥러닝 CNN으로 학습을 합니다. 출력 결과는 행동의 개수만큼 나오는데 각 행동일때의 Q값을 나타냅니다.

 

 

 

캐치게임을 학습하는 코드는 아래와 같습니다. 원래 소스에서 getBatch()함수에 오류가 있는 것 같아 조금 수정하였습니다.

 

그리고 복잡한 게임이 아니라서 그런지 CNN 대신 히든레이어가 2개인 신경망이 사용되었습니다. 밑의 output_layer가 사실 hidden_layer2라고 보시면 됩니다.

 

 

 

-----------------------------------------------------------------

import tensorflow as tf
import numpy as np
import random
import math
import os

 

 

 

#----------------------------

# 파라미터 설정

#----------------------------
epsilon = 1  # 랜덤하게 행동할 확률
epsilonMinimumValue = 0.001  # epsilon의 최소값
nbActions = 3  # 행동의 개수 (왼쪽, 대기, 오른쪽)
epoch = 1001  # 게임 반복횟수
hiddenSize = 100  # 히든 레이어 뉴런 개수
maxMemory = 500  # 게임내용을 기억하는 최대 개수
batchSize = 50  # 학습시 데이터 묶음 개수
gridSize = 10  # 격자 크기
nbStates = gridSize * gridSize  # 상태 개수
discount = 0.9  # 감소값
learningRate = 0.2  # 학습률

 

 

 

#----------------------------

# 모델 설정

#----------------------------

# 입력 레이어

X = tf.placeholder(tf.float32, [None, nbStates])
W1 = tf.Variable(tf.truncated_normal([nbStates, hiddenSize], stddev=1.0 / math.sqrt(float(nbStates))))
b1 = tf.Variable(tf.truncated_normal([hiddenSize], stddev=0.01))  
input_layer = tf.nn.relu(tf.matmul(X, W1) + b1)

 

# 히든 레이어
W2 = tf.Variable(tf.truncated_normal([hiddenSize, hiddenSize],stddev=1.0 /

                                                                                   math.sqrt(float(hiddenSize))))
b2 = tf.Variable(tf.truncated_normal([hiddenSize], stddev=0.01))
hidden_layer = tf.nn.relu(tf.matmul(input_layer, W2) + b2)

 

#출력 레이어
W3 = tf.Variable(tf.truncated_normal([hiddenSize, nbActions],stddev=1.0 / math.sqrt(float(hiddenSize))))
b3 = tf.Variable(tf.truncated_normal([nbActions], stddev=0.01))
output_layer = tf.matmul(hidden_layer, W3) + b3

 

# 목표값 플레이스홀더
Y = tf.placeholder(tf.float32, [None, nbActions])

 

# 목표값과 출력값의 차이인 코스트
cost = tf.reduce_sum(tf.square(Y-output_layer)) / (2*batchSize)

 

# 경사하강법으로 코스트가 최소가 되는 값 찾음
optimizer = tf.train.GradientDescentOptimizer(learningRate).minimize(cost)

 

 

 

#----------------------------

# 랜덤값 구함

#----------------------------

def randf(s, e):
  return (float(random.randrange(0, (e - s) * 9999)) / 10000) + s;

 

 

 

#----------------------------

# 환경 클래스

#----------------------------

class CatchEnvironment():

 

  # 초기화
  def __init__(self, gridSize):
    self.gridSize = gridSize
    self.nbStates = self.gridSize * self.gridSize
    self.state = np.empty(3, dtype = np.uint8) 

 

  # 화면정보 리턴
  def observe(self):
    canvas = self.drawState()
    canvas = np.reshape(canvas, (-1,self.nbStates))
    return canvas

 

  #블럭과 바를 표시하여 화면정보 리턴

  def drawState(self):
    canvas = np.zeros((self.gridSize, self.gridSize))

 

    # 과일 표시
    canvas[self.state[0]-1, self.state[1]-1] = 1


    # 바구니 표시
    canvas[self.gridSize-1, self.state[2] -1 - 1] = 1
    canvas[self.gridSize-1, self.state[2] -1] = 1
    canvas[self.gridSize-1, self.state[2] -1 + 1] = 1    
    return canvas        

 

  # 과일과 바구니 위치 초기화
  def reset(self): 
    initialFruitColumn = random.randrange(1, self.gridSize + 1)
    initialBucketPosition = random.randrange(2, self.gridSize + 1 - 1)
    self.state = np.array([1, initialFruitColumn, initialBucketPosition]) 
    return self.getState()

 

  # 상태 리턴

  def getState(self):
    stateInfo = self.state
    fruit_row = stateInfo[0]
    fruit_col = stateInfo[1]
    basket = stateInfo[2]
    return fruit_row, fruit_col, basket

 

  # 보상값 리턴
  def getReward(self):
    fruitRow, fruitColumn, basket = self.getState()
    if (fruitRow == self.gridSize - 1):  # If the fruit has reached the bottom.
      if (abs(fruitColumn - basket) <= 1): # Check if the basket caught the fruit.
        return 1
      else:
        return -1
    else:
      return 0

 

  # 게임오버 검사

  def isGameOver(self):
    if (self.state[0] == self.gridSize - 1): 
      return True 
    else: 
      return False 

 

  # 상태 업데이트

  def updateState(self, action):
    if (action == 1):
      action = -1  # 왼쪽 이동
    elif (action == 2):
      action = 0  # 대기
    else:
      action = 1  # 오른쪽 이동
    fruitRow, fruitColumn, basket = self.getState()
    newBasket = min(max(2, basket + action), self.gridSize - 1)  # 바구니 위치 변경
    fruitRow = fruitRow + 1  # 과일을 아래로 이동
    self.state = np.array([fruitRow, fruitColumn, newBasket])

 

  # 행동 수행 (1->왼쪽, 2->대기, 3->오른쪽)

  def act(self, action):
    self.updateState(action)
    reward = self.getReward()
    gameOver = self.isGameOver()
    return self.observe(), reward, gameOver, self.getState() 

 

 

 

#----------------------------

# 메모리 클래스

# 게임내용을 저장하고 나중에 배치로 묶어 학습에 사용

#----------------------------

class ReplayMemory:

 

  # 초기화
  def __init__(self, gridSize, maxMemory, discount):
    self.maxMemory = maxMemory
    self.gridSize = gridSize
    self.nbStates = self.gridSize * self.gridSize
    self.discount = discount
    canvas = np.zeros((self.gridSize, self.gridSize))
    canvas = np.reshape(canvas, (-1,self.nbStates))
    self.inputState = np.empty((self.maxMemory, 100), dtype = np.float32)
    self.actions = np.zeros(self.maxMemory, dtype = np.uint8)
    self.nextState = np.empty((self.maxMemory, 100), dtype = np.float32)
    self.gameOver = np.empty(self.maxMemory, dtype = np.bool)
    self.rewards = np.empty(self.maxMemory, dtype = np.int8) 
    self.count = 0
    self.current = 0

 

  # 게임내용 추가
  def remember(self, currentState, action, reward, nextState, gameOver):
    self.actions[self.current] = action
    self.rewards[self.current] = reward
    self.inputState[self.current, ...] = currentState
    self.nextState[self.current, ...] = nextState
    self.gameOver[self.current] = gameOver
    self.count = max(self.count, self.current + 1)
    self.current = (self.current + 1) % self.maxMemory

 

  # 게임내용을 배치로 묶어서 리턴

  def getBatch(self, model, batchSize, nbActions, nbStates, sess, X):
    memoryLength = self.count
    chosenBatchSize = min(batchSize, memoryLength)

    inputs = np.zeros((chosenBatchSize, nbStates))
    targets = np.zeros((chosenBatchSize, nbActions))

 

    for i in xrange(chosenBatchSize):
      # 메모리에서 랜덤하게 선택

      randomIndex = random.randrange(0, memoryLength)
      current_inputState = np.reshape(self.inputState[randomIndex], (1, 100))

      target = sess.run(model, feed_dict={X: current_inputState})
      
      current_nextState = np.reshape(self.nextState[randomIndex], (1, 100))
      current_outputs = sess.run(model, feed_dict={X: current_nextState})      
      
      # 다음 상태의 최대 Q값
      nextStateMaxQ = np.amax(current_outputs)


      if (self.gameOver[randomIndex] == True):

        # 게임오버일때 Q값은 보상값으로 설정
        target[0, [self.actions[randomIndex]-1]] = self.rewards[randomIndex]
      else:

        # Q값을 계산
        # reward + discount(gamma) * max_a' Q(s',a')
        target[0, [self.actions[randomIndex]-1]] = self.rewards[randomIndex] +

                                                           self.discount * nextStateMaxQ

 

      inputs[i] = current_inputState
      targets[i] = target

    return inputs, targets

 

 

 

#----------------------------

# 메인 함수

#----------------------------

def main(_):
  print("Training new model")

  # 환경 정의
  env = CatchEnvironment(gridSize)

  # 메모리 정의
  memory = ReplayMemory(gridSize, maxMemory, discount)

  # 세이버 설정
  saver = tf.train.Saver()
  
  winCount = 0
  with tf.Session() as sess:   
    tf.initialize_all_variables().run() 

    for i in xrange(epoch):
      err = 0

      env.reset()
      isGameOver = False

      currentState = env.observe()
            
      while (isGameOver != True):
        action = -9999 


        # 랜덤으로 행동을 할지 Q값에 따라 행동할지 결정
        global epsilon
        if (randf(0, 1) <= epsilon):
          action = random.randrange(1, nbActions+1)
        else:          
          q = sess.run(output_layer, feed_dict={X: currentState})          
          index = q.argmax()
          action = index + 1     

 

        # 랜덤으로 행동할 확률 감소
        if (epsilon > epsilonMinimumValue):
          epsilon = epsilon * 0.999
        

        # 행동 수행
        nextState, reward, gameOver, stateInfo = env.act(action)
            

        # 승리 횟수 설정
        if (reward == 1):
          winCount = winCount + 1

 

        # 메모리에 저장

        memory.remember(currentState, action, reward, nextState, gameOver)
        

        # 다음 상태 설정
        currentState = nextState
        isGameOver = gameOver
                
        # 입력과 출력 데이터 배치를 구함
        inputs, targets = memory.getBatch(output_layer, batchSize, nbActions, nbStates, sess, X)
        
        # 학습 수행
        _, loss = sess.run([optimizer, cost], feed_dict={X: inputs, Y: targets})  
        err = err + loss

 

      print("Epoch " + str(i) + ": err = " + str(err) + ": Win count = " + str(winCount) +

            " Win ratio = " + str(float(winCount)/float(i+1)*100))

 

    # 모델 세션 저장
    save_path = saver.save(sess, os.getcwd()+"/model.ckpt")
    print("Model saved in file: %s" % save_path)

 

 

 

#----------------------------

# 메인 함수 실행

#----------------------------

f __name__ == '__main__':
  tf.app.run()

-----------------------------------------------------------------

 

 

 

< 인공지능 개발자 모임 >

- 페이스북 그룹에 가입하시면 인공지능에 대한 최신 정보를 쉽게 받으실 수 있습니다.

https://www.facebook.com/groups/AIDevKr/

 

List of Articles
제목 글쓴이 날짜 조회 수
텐센트의 스타크래프트2 인공지능 - TStarBot file 깊은바다 2018-09-21 229
tensorflow-js로 만들어본 RL 예제: 2. Deep-sarsa 깊은바다 2018-07-27 285
웹브라우저에서 자율주행차 강화학습 프로젝트 [2] 깊은바다 2018-06-21 299
새로운 강화학습 방식인 니마시니 알고리즘 깊은바다 2018-06-03 514
딥마인드의 모방을 통한 게임 학습 깊은바다 2018-06-01 247
강화학습의 개요 깊은바다 2018-05-28 174
스타2 강화학습 튜토리얼 [2] 깊은바다 2018-02-04 659
DQN 강화학습으로 슈퍼마리오 인공지능 구현 깊은바다 2017-10-30 1243
스타크래프트2 강화학습 깊은바다 2017-10-14 702
강화학습과 DQN 소개 깊은바다 2017-10-05 537
알아두면 쓸데있는 신기한 강화학습 깊은바다 2017-08-24 470
알파고를 탄생시킨 강화학습의 비밀 깊은바다 2017-06-25 406
신경망 Q-Learning으로 구현한 오목 게임 깊은바다 2017-03-29 5784
강화학습 Q-Learning과 DQN에 대한 설명 깊은바다 2017-03-27 3384
텐서플로우에서 DQN 강화학습으로 캐치게임 구현 file 깊은바다 2017-03-18 1854