본문 바로가기
인공지능/강화학습

REINFORCE 알고리즘을 이용한 CartPole 강화학습

by JaeminRyu 2023. 6. 10.
728x90

1.배경

인공지능은 간단하게 보면 함수를 만드는 것.
입력층 -> 은닉층 -> 출력층
 
목표 : CartPole 게임을 오래 하는 인공지능 만들기

->DQN에서는 state를 입력하면 어느 행동을 하는 것이 좋은지 행동가치 함수를 만들었음

Reinforce 알고리즘 에서는 각 행동을 할 확률을 출력하도록 만드는 것이 목표.

즉 인공신경망 자체가 어떤 행동을 할지 결정하는 정책함수가 됨.

 

학습을 위해서 해당 정책이 얼마나 좋은지 평가해야함. -> 정책목적함수 정의

정책목적함수 J(θ) : θ라는 파라미터를 가진 정책함수 π를 평가함.

정책목적함수만 정의하면 경사상승법을 사용해서 정책목적함수를 극대화하는 방향으로 파라미터를 학습을 하게 만들면 됨.

 

reinforce 알고리즘에서는 정책목적함수로 MDP의 가치평가 함수를 사용함.

 

가치평가함수 v_π(s)

v_π(s)는 정책 π를 따랐을 때 얻을 수 있는 반환값의 기대값을 출력함. 

이때 식을 one-step 으로만 살펴보면 (감가율을 0이라고 하면)

v_π(s) = 모든 행동 A에 대한 π(a|s)*R(s,a)의 합이다.

즉 이를 풀어서 쓰면 v_π(s)는 모든 행동에 대해서 (상태 s에서 a를 고를 확률) * (상태s에서 a를 골랐을 때의 보상) 의 합이다.

가치 평가 함수를 극대화 하는 방향으로 θ를 변경한다

= π를 따랐을 때 얻을 수 있는 반환값의 기대값을 극대화 시키는 방향으로 π를 학습시킨다

= 정책목적함수로 사용하는데 타당하다.

따라서 가치평가함수(v(s))를 정책목적함수(J(θ))로 사용할 수 있다.

이때 정책목적함수 J(θ)의 gradient는

J(θ) = 모든 행동 A에 대한 ∇π(a|s)*R(s,a)의 합

으로 나타낼 수 있는데, 이는 우도 비율 개념을 사용하면 

모든 행동 A에 대한 π(a|s)∇log(π(a|s))*R(s,a)의 합으로 변경할 수 있다고 하는데, 사실 우도 비율이 뭔지 몰라서 식 유도는 생략 하겠다.

이는 다시 E[∇log(π(a|s))*R(s,a)]의 기대값으로 나타낼 수 있고,  (π(a|s)의 출력은 s에서 a를 할 확률 분포이다.)

마지막으로 SGD의 샘플링을 이용하여 E[∇log(π(a|s))*r(실제 보상)] 로 나타낼 수 있다.

이때 R은 모든 상태와 행동에 대한 보상의 집합을 의미하는데, r은 a와 s에 대한 실제 보상만을 의미한다.

이때 즉각적인 보상 r또한 구하기 쉽지 않기 때문에, 에피소드를 끝난 뒤의 실제 반환값 G_t를 대신해서 사용할 수 있다.

이때 G_t를 곱하는 방법은 MC방식이며 REINFORCE 알고리즘이라고 한다.

따라서 마지막 식으로는

 J(θ) = E[∇log(π(a|s))*G_t] 가 되며

최종적인 비용함수는 - log(출력) * G_t 가 된다.비용함수로 변환될 때 마이너스를 붙이는 이유는, optimizer를 코드에서 돌릴때 기본적으로 경사 하강법을 사용한다. 이때 마이너스를 붙이게 되면, 해당 값이 올라가는 방향으로 이동하므로 경사 상승법을 사용하는 효과를 얻게 된다.

 

반환값을 계산하는 방법은

G_t = (출력값_t * r_t) + 감가율*(출력값_t+1 * r_t+1) + 감가율^2*(출력값_t+2 * r_t+2) + ......

즉 반환값은 한 에피소드에서 일어난 모든 출력값 * 보상들의 합이다.

 

출력층

우리의 목적은 state를 입력하면 각 행동을 할 확률을 출력하는 모델을 만들어야 한다.

이때 출력층으로는 소프트맥스 함수를 사용한다.

소프트맥스 함수는 모든 출력값의 합이 1이고, 각 출력값들은 0~1 사이의 값으로 반환되므로 각 행동을 할 확률로 사용할 수 있다.

2.학습 흐름

1.인공신경망에 state 입력

2.action을 출력함(softmax 출력층)

3.그중에서 a_t를 확률분포에 맞게 골라서 수행

4.보상(r_t)과 상태 (s_t+1)이 반환됨 -> 보상r_t는 계속해서 Learning Data(반환값)에 저장

5.에피소드가 안끝났으면 다시 1부터 계속 반복해서 에피소드를 끝냄

6.에피소드가 끝났으면 Learning Data에 있는 함수 결괏값(출력값)과 반환값(G_t)를 비용 함수에 입력해서 값을 계산

7.비용 함수를 경사하강법을 통해 최소화하는 방향으로 학습.

 

3.코드

import gym
import tensorflow as tf
import numpy as np
import random
from collections import deque
from keras.optimizers import Adam
import tensorflow_probability as tfp
import matplotlib.pyplot as plt

env = gym.make('CartPole-v0') # 카트폴이라는 환경 받아오기


def loss_func(prob,action,G):
  #손실함수 : action할 확률에 로그를 취하고 G를 곱함
  dist = tfp.distributions.Categorical(probs=prob, dtype=tf.float32)  #분포로 변경
  log_prob = dist.log_prob(action)  #action할 확률에 로그를 취함
  loss = -log_prob * G  #loss 계산

  return loss

#모델 만들기. 4 * 24 * 24 * 2
model = tf.keras.models.Sequential([
  tf.keras.layers.Dense(24, input_dim=4, activation=tf.nn.relu),
  tf.keras.layers.Dense(24, activation=tf.nn.relu),
  tf.keras.layers.Dense(2, activation='softmax')
])


#하이퍼 파라미터
episode_number = 1000 #에피소드 개수
learning_rate = 0.0005
optimizer = Adam(learning_rate)
discount_factor = 0.9

#그래프
GRAPH_Y = []

for episode in range(episode_number): #episode_number만큼 episode 반복
  #점수, 상태 배열, 행동 배열, 보상 배열 초기화
  score = 0 
  states = []
  actions = []
  rewards = []

  state = env.reset()   #환경 초기화

  done = False
  while not done:       #게임 끝날때 까지 반복
    # 정책에 따라 행동 선택
    prob = model.predict(np.array([state]),verbose = 0)   #출력값(action할 확률) 받아오기
    dist = tfp.distributions.Categorical(probs=prob,dtype=tf.float32)
    action = int(dist.sample().numpy()[0])      #확률별로 랜덤하게 샘플(action)을 고름

    next_state, reward, done, _ = env.step(action)      #한 스텝 진행
    #상태,행동,보상 저장
    states.append(state)
    actions.append(action)
    rewards.append(reward)
    state = next_state
    score += 1

  # 에피소드 종료 후 학습
  returns = []
  G = 0   #리턴값
  #G가 귀납적으로 정의되므로, rewards를 뒤집어서 마지막 G_n 부터 구함 
  for r in reversed(rewards):
    G = r + discount_factor * G
    returns.insert(0,G)
  #print(returns)

  #states,actions,returns 값을 사용해서 학습 시작
  for S,A,G in zip(states,actions,returns):
    with tf.GradientTape() as tape:
      S = np.reshape(S,[1,4])   #state 정규화
      p = model(S)              #모델 출력값 구하기
      loss = loss_func(p,A,G)   #손실 함수 계산
      
    gradients = tape.gradient(loss, model.trainable_variables)    #경사 계산
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))  #경사 적용

  
  GRAPH_Y.append(score)
  if episode % 50 == 0:
    print("Episode:", episode, "Score:",score)  

    
print("Episode:", episode, "Total Reward:", score)  
plt.plot(GRAPH_Y)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.show()