1.基本场景
不同与之前的1D问题,这次任务将问题拓展到2D上,如下图只考虑三个作战单元:蓝方战斗机Fighter、红方目标Target、红方导弹防御系统SAM。
- 目的:是训练Fighter使其能学到花费最小的时间代价使得Fighter绕过SAM的导弹火力范围,避免火力打击从而直接攻击并击毁Target的启发式原则。
- 限制:保证Fighter、SAM、Target在场景中的初始位置随机,但基本要使得其相对方位满足上图所示的关系。下表是三者之间的初始位置以及Fighter与SAM之间的火力范围:
- 控制:在每个时刻的Fighter的下一个航向(the fighter’s next heading),是一个连续型控制变量 [ 0 , 2 π ] [0,2\pi] [0,2π],是唯一的动作变量。
- 假设:在此假设Agent只能根据当前状态 s t s_t st下的每个时间点处的战斗机理想位置采取动作 a t a_t at,而与之前的Agent的状态 s i ( i = 0 , 1 , . . t − 1 ) s_{i}(i=0,1,..t-1) si(i=0,1,..t−1)无关。并且在原论文中,为了限制状态空间的尺寸,而将Fighter的速度设置成常量。而在我们这里,将在扩展部分考虑将Fighter的加速度 a t a_t at也纳入动作空间。同时SAM系统具有部分全向检测的能力,即对于只落在其射程内的Fighter都具有检测能力,而Fighter则对于SAM系统的状态则是全可观测的。
- 基本平台:原论文中采用的是AFGYM/AFSIM是基本平台。求解算法不再使用Q-Learning,由于其相比与策略梯度方法而言在求解高维状态空间的问题上不具备优势,其在使用带Reinforce的Q-Learning训练行为时,最后的结果不太理想,就像随机游走一样。并且由于决策的实时性导致GAN也是不太适用的,论文采用了A3C和PPO算法对原来问题做了训练。
2.基本模型
2.1 状态空间与动作空间
将原来场景中的状态向量
s
t
s_t
st由以下的11维向量组成:
注意:训练过程中除了0-1变量,所有的状态向量的值都被归一化了,否则将限制梯度的反向传播过程。在后面的编程中暂时不考虑SAM pointing、Target heading,因此状态空间被压缩到9维。这个可以作为后面改进的一个具体的方向。其中的Fighter heading作为一个基本的动作
a t a_t at。
因此最终状态
s
t
s_t
st的状态空间:
S
=
[
0
,
1
]
8
S=[0,1]^{8}
S=[0,1]8,而动作
a
t
a_t
at的动作空间:
A
=
[
0
,
1
]
A=[0,1]
A=[0,1]映射到角度范围
[
−
18
0
。
,
18
0
。
]
[-180^{。},180^{。}]
[−180。,180。]。
2.2 奖励函数
每当无人机飞行一个时间步长
Δ
t
\Delta t
Δt,若未出现在SAM的火力射程之内,则返回奖励-1,回合继续。若出现在SAM的火力射程之内则返回奖励-50,回合继续。若出现在SAM的火力射程之内且被SAM击落,则返回-1000,回合结束。若出现在Target 附近返回50,回合继续。若出现在Target附近且成功打击到Target,返回1000,回合结束。
2.3 状态转移方程
模型只考虑Fighter匀速运动的情况,后面可以考虑施加突防效果,以此作为一个突破点的研究方向(2)。
假设Fighter的速度恒定为
v
v
v,
t
t
t时刻Fighter的航向为
a
t
a_t
at(动作变量),Fighter的位置为
(
x
J
(
t
)
,
y
J
(
t
)
)
(x_J^{(t)},y_J^{(t)})
(xJ(t),yJ(t)),则有:
x
J
(
t
+
1
)
=
x
J
(
t
)
+
v
Δ
t
cos
a
t
y
J
(
t
+
1
)
=
y
J
(
t
)
+
v
Δ
t
sin
a
t
x_J^{(t+1)}=x_J^{(t)}+v\Delta t \cos a_t \\ y_J^{(t+1)}=y_J^{(t)}+v\Delta t \sin a_t
xJ(t+1)=xJ(t)+vΔtcosatyJ(t+1)=yJ(t)+vΔtsinat
Fighter的弹药量
w
t
w_{t}
wt,如果
t
t
t时刻Fighter打击目标则
w
t
+
1
=
w
t
−
d
w
t
w_{t+1}=w_t-dw_t
wt+1=wt−dwt。在此处我们不考虑弹药量的损失,考虑充分的弹药量,这里也可以作为一个突破点方向(3)。设SAM的火力范围区域为
R
R
R,在这里考虑
R
R
R为以SAM为中心,
R
S
A
M
R_{SAM}
RSAM为半径的圆形,其中
R
S
A
M
R_{SAM}
RSAM为Fighter的火力杀伤范围。
3.环境搭建
下面将直接给出代码:
classFighter:def__init__(self,L_limits=100,x=0,y=0,SAM_x=50,SAM_y=50,
target_x =70,target_y=80,velocity=2175.0/3600,
delta_t=10,dead=False,SAM_range=40,SAM_time=60,
Fighter_range=10,firing_range=3.5,weapon_count=20):
self.L_limits = L_limits # 限制范围
self.x = x # 战斗机经度
self.y = y # 战斗机纬度
self.SAM_x = SAM_x
self.SAM_y = SAM_y
self.target_x = target_x
self.target_y = target_y
# 上面是位置的基本属性
self.SAM_disable =False
self.SAM_time = SAM_time # SAM系统的预备时间
self.SAM_range = SAM_range
self.firing_range = firing_range # 战斗机火力范围
self.weapon_count = weapon_count # 战斗机弹药容量
self.velocity = velocity
self.delta_t = delta_t
self.dead = dead
self.wait_time =0
self.burn_reward =-1000
self.win_reward =1000
self.reward =-1# 战斗机正常运行# 下面对战斗机的初始位置进行更新defreset(self):
self.x = self.L_limits*np.random.random()
self.y = self.L_limits*np.random.random()
self.dead =False
self.wait_time =0
self.SAM_disable =Falsereturn self.x,self.y
# 下面是状态转移方程defstep(self,figher_heading):# 下面采用弧度制
figher_heading = np.clip(figher_heading,-np.pi,np.pi)
self.x = self.x + self.velocity * self.delta_t*np.cos(figher_heading)
self.y = self.y + self.velocity * self.delta_t*np.sin(figher_heading)# 限制战斗机的横向距离if self.x <=0:
self.x =0elif self.x > self.L_limits:
self.x = self.L_limits
# 限制战斗机的纵向位置if self.y <=0:
self.y =0elif self.y > self.L_limits:
self.y = self.L_limits
# 判断战斗机是否在区域内if self.SAM_disable ==False:
d = np.sqrt((self.x - self.SAM_x)**2+(self.y - self.SAM_y)**2)if d <= self.SAM_range:
self.wait_time = self.wait_time + self.delta_t
else:# 一旦不在其SAM射程之内
self.wait_time =0# 判断战斗机的等待时间if self.wait_time >= self.SAM_time:#当战斗机在SAM的时间内停留一段时间# 此时战斗机会被击落,返回极大奖励
self.wait_time =0return(self.x,self.y),self.burn_reward,True# 当战斗机靠近SAM时可以立即击毁SAM(这是一个关键节点给50的奖励)if d <= self.firing_range and self.SAM_disable ==False:
self.SAM_disable =True
self.wait_time =0return(self.x,self.y),50,False# 当战斗机靠近target时if np.sqrt((self.target_x - self.x)**2+(self.target_y - self.y)**2)<= self.firing_range:return(self.x,self.y),self.win_reward,True# 战斗机正常运行时候,返回-1return(self.x,self.y),-1,False## 强化学习或许可以结合故障树进行策略分析!!!
下面利用随机游走给出测试代码:
x_array,y_array =[],[]
rewards =0.0
SAM_x,SAM_y =50,50
target_x,target_y =80,70
SAM_range =20
fighter = Fighter(SAM_x=SAM_x,SAM_y=SAM_y,target_x=target_x,target_y=target_y,SAM_range=SAM_range)whileTrue:
fighter_heading = np.random.random()*2*np.pi-np.pi
next_state,reward,done = fighter.step(fighter_heading)
rewards += reward
print("当前Figher位置[{},{}],获得奖励{},累计奖励{}".format(next_state[0],next_state[1],reward,rewards))
x_array.append(next_state[0])
y_array.append(next_state[1])if done:breakimport matplotlib.pyplot as plt
# 以火力射程画圆
circle_x,circle_y =[],[]for theta in np.linspace(-np.pi,np.pi):
circle_x.append(SAM_x + SAM_range*np.cos(theta))
circle_y.append(SAM_y + SAM_range*np.sin(theta))
plt.figure(figsize=(5,5))
plt.plot(x_array,y_array,'b.-',linewidth=1)
plt.plot(SAM_x,SAM_y,'ro',linewidth=5)
plt.plot(circle_x,circle_y,'r.-',linewidth=1)
plt.plot(target_x,target_y,'go',linewidth=5)
plt.title(['one road'])
测试结果为:
以下情况是被SAM击落的:
以下情况时成功绕开SAM的:
2.实现
采用带优化的连续PPO算法求解上面的问题,需要导入库:
import torch
import matplotlib.pyplot as plt
import numpy as np
from normalization import Normalization, RewardScaling
from replaybuffer import ReplayBuffer
from ppo_continuous import PPO_continuous
from Fighter_env import Fighter
from CPPO_main import args_param,train_network,test_network
采用下面的主函数求解:
# 声明环境
args = args_param(max_episode_steps=2000,batch_size=64,max_train_steps=5000,K_epochs=3)# 声明参数
env = Fighter(delta_t=25,SAM_range=30,Fighter_range=10,SAM_time=30)
env.test(show_pictures=True,show_tips=False)
agent = train_network(args,env)
x_array,y_array,_ = test_network(args,env,agent)
得到以下平均奖励的收敛曲线:
得到如下的测试结果:
看来用连续动作空间的PPO训练网络还是问题太大了!!!
3.代码
- 在normalization.py中保存以下代码作为标准化:
import numpy as np
classRunningMeanStd:# Dynamically calculate mean and stddef__init__(self, shape):# shape:the dimension of input data
self.n =0
self.mean = np.zeros(shape)
self.S = np.zeros(shape)
self.std = np.sqrt(self.S)defupdate(self, x):
x = np.array(x)
self.n +=1if self.n ==1:
self.mean = x
self.std = x
else:
old_mean = self.mean.copy()
self.mean = old_mean +(x - old_mean)/ self.n
self.S = self.S +(x - old_mean)*(x - self.mean)
self.std = np.sqrt(self.S / self.n)classNormalization:def__init__(self, shape):
self.running_ms = RunningMeanStd(shape=shape)def__call__(self, x, update=True):# Whether to update the mean and std,during the evaluating,update=Falseif update:
self.running_ms.update(x)
x =(x - self.running_ms.mean)/(self.running_ms.std +1e-8)return x
classRewardScaling:def__init__(self, shape, gamma):
self.shape = shape # reward shape=1
self.gamma = gamma # discount factor
self.running_ms = RunningMeanStd(shape=self.shape)
self.R = np.zeros(self.shape)def__call__(self, x):
self.R = self.gamma * self.R + x
self.running_ms.update(self.R)
x = x /(self.running_ms.std +1e-8)# Only divided stdreturn x
defreset(self):# When an episode is done,we should reset 'self.R'
self.R = np.zeros(self.shape)
- 在Fighter_env.py中写入以下代码,用来作为定战斗机环境:
import numpy as np
import matplotlib.pyplot as plt
# 下面类用来定义环境classFighter:def__init__(self,L_limits=100,x=0,y=0,SAM_x=50,SAM_y=50,
target_x =70,target_y=80,velocity=2175.0/3600,
delta_t=10,dead=False,SAM_range=40,SAM_time=60,
Fighter_range=10,weapon_count=20):
self.L_limits = L_limits # 限制范围
self.x = x # 战斗机经度
self.y = y # 战斗机纬度
self.SAM_x = SAM_x
self.SAM_y = SAM_y
self.target_x = target_x
self.target_y = target_y
# 上面是位置的基本属性
self.SAM_disable =False
self.SAM_time = SAM_time # SAM系统的预备时间
self.SAM_range = SAM_range
self.firing_range = Fighter_range # 战斗机火力范围
self.weapon_count = weapon_count # 战斗机弹药容量
self.velocity = velocity
self.delta_t = delta_t
self.dead = dead
self.wait_time =0
self.burn_reward =-1000
self.win_reward =1000
self.reward =-1# 战斗机正常运行# 下面声明其基本属性
self.observation_space = np.array([[0,self.L_limits],[0,self.L_limits]])
self.action_space = np.array([[-np.pi,np.pi]])# 下面对战斗机的初始位置进行更新defreset(self):# self.x = self.L_limits*np.random.random()# self.y = self.L_limits*np.random.random()
self.x =0
self.y =0
self.dead =False
self.wait_time =0
self.SAM_disable =Falsereturn np.array([self.x,self.y],dtype=np.float32)# 下面是状态转移方程defstep(self,figher_heading):# 下面采用弧度制
figher_heading = np.clip(figher_heading,-np.pi,np.pi)
self.x = self.x + self.velocity * self.delta_t*np.cos(figher_heading)
self.y = self.y + self.velocity * self.delta_t*np.sin(figher_heading)
self.x = np.clip(self.x,a_min=0.0,a_max=self.L_limits)
self.y = np.clip(self.y,a_min=0.0,a_max=self.L_limits)# 当战斗机靠近target时if np.linalg.norm(np.array([self.target_x - self.x, self.target_y - self.y], dtype=np.float32))<= self.firing_range:return np.array([self.x, self.y], dtype=np.float32), self.win_reward,True# 判断战斗机是否在区域内
d = np.linalg.norm(np.array([self.x - self.SAM_x, self.y - self.SAM_y], dtype=np.float32))if self.SAM_disable ==False:if d <= self.SAM_range:
self.wait_time = self.wait_time + self.delta_t
else:# 一旦不在其SAM射程之内
self.wait_time =0# 判断战斗机的等待时间if self.wait_time >= self.SAM_time:#当战斗机在SAM的时间内停留一段时间# 此时战斗机会被击落,返回极大奖励
self.wait_time =0return np.array([self.x,self.y],dtype=np.float32),self.burn_reward,True# 当战斗机靠近SAM时可以立即击毁SAM(这是一个关键节点给50的奖励)if d <= self.firing_range and self.SAM_disable ==False:
self.SAM_disable =True
self.wait_time =0return np.array([self.x,self.y],dtype=np.float32),50,False# 战斗机正常运行时候,返回-1return np.array([self.x,self.y],dtype=np.float32),-1,False## 强化学习或许可以结合故障树进行策略分析!!!# 下面将测试一个回合deftest(self,show_tips=True,show_pictures=True):
x_array,y_array=[],[]
rewards =0.0whileTrue:
fighter_headings = np.random.random()*2*np.pi - np.pi
next_state,reward,done = self.step(fighter_headings)
rewards += reward
if show_tips:print("当前Figher位置[{},{}],获得奖励{},累计奖励{}".format(next_state[0], next_state[1], reward, rewards))
x_array.append(next_state[0])
y_array.append(next_state[1])if done:break# 如果要画图就以火力范围画图if show_pictures:
circle_x, circle_y =[],[]for theta in np.linspace(-np.pi, np.pi):
circle_x.append(self.SAM_x + self.SAM_range * np.cos(theta))
circle_y.append(self.SAM_y + self.SAM_range * np.sin(theta))
plt.figure(figsize=(5,5))
plt.plot(x_array, y_array,'b.-', linewidth=1)
plt.plot(self.SAM_x, self.SAM_y,'ro', linewidth=5)
plt.plot(circle_x, circle_y,'r.-', linewidth=1)
plt.plot(self.target_x, self.target_y,'go', linewidth=5)
plt.plot(x_array[-1],y_array[-1],'m*',linewidth=5)
plt.title(['one road'])
plt.show()
在replaybuffer.py中存储缓存区replaybuffer类:
def__init__(self, args):
self.state_dim = args.state_dim
self.action_dim = args.action_dim
self.batch_size = args.batch_size
self.s = np.zeros((args.batch_size, args.state_dim))
self.a = np.zeros((args.batch_size, args.action_dim))
self.a_logprob = np.zeros((args.batch_size, args.action_dim))
self.r = np.zeros((args.batch_size,1))
self.s_ = np.zeros((args.batch_size, args.state_dim))
self.dw = np.zeros((args.batch_size,1))
self.done = np.zeros((args.batch_size,1))
self.count =0defstore(self, s, a, a_logprob, r, s_, dw, done):
self.s[self.count]= s
self.a[self.count]= a
self.a_logprob[self.count]= a_logprob
self.r[self.count]= r
self.s_[self.count]= s_
self.dw[self.count]= dw
self.done[self.count]= done
self.count +=1defnumpy_to_tensor(self):
s = torch.tensor(self.s, dtype=torch.float)
a = torch.tensor(self.a, dtype=torch.float)
a_logprob = torch.tensor(self.a_logprob, dtype=torch.float)
r = torch.tensor(self.r, dtype=torch.float)
s_ = torch.tensor(self.s_, dtype=torch.float)
dw = torch.tensor(self.dw, dtype=torch.float)
done = torch.tensor(self.done, dtype=torch.float)return s, a, a_logprob, r, s_, dw, done
- 在以下文件ppo_continuous.py中包含带优化的连续PPO算法的代码:
import torch
import torch.nn.functional as F
from torch.utils.data.sampler import BatchSampler, SubsetRandomSampler
import torch.nn as nn
from torch.distributions import Beta, Normal
# Trick 8: orthogonal initializationdeforthogonal_init(layer, gain=1.0):
nn.init.orthogonal_(layer.weight, gain=gain)
nn.init.constant_(layer.bias,0)classActor_Beta(nn.Module):def__init__(self, args):super(Actor_Beta, self).__init__()
self.fc1 = nn.Linear(args.state_dim, args.hidden_width)
self.fc2 = nn.Linear(args.hidden_width, args.hidden_width)
self.alpha_layer = nn.Linear(args.hidden_width, args.action_dim)
self.beta_layer = nn.Linear(args.hidden_width, args.action_dim)
self.activate_func =[nn.ReLU(), nn.Tanh()][args.use_tanh]# Trick10: use tanhif args.use_orthogonal_init:print("------use_orthogonal_init------")
orthogonal_init(self.fc1)
orthogonal_init(self.fc2)
orthogonal_init(self.alpha_layer, gain=0.01)
orthogonal_init(self.beta_layer, gain=0.01)defforward(self, s):
s = self.activate_func(self.fc1(s))
s = self.activate_func(self.fc2(s))# alpha and beta need to be larger than 1,so we use 'softplus' as the activation function and then plus 1
alpha = F.softplus(self.alpha_layer(s))+1.0
beta = F.softplus(self.beta_layer(s))+1.0return alpha, beta
defget_dist(self, s):
alpha, beta = self.forward(s)
dist = Beta(alpha, beta)return dist
defmean(self, s):
alpha, beta = self.forward(s)
mean = alpha /(alpha + beta)# The mean of the beta distributionreturn mean
classActor_Gaussian(nn.Module):def__init__(self, args):super(Actor_Gaussian, self).__init__()
self.max_action = args.max_action
self.fc1 = nn.Linear(args.state_dim, args.hidden_width)
self.fc2 = nn.Linear(args.hidden_width, args.hidden_width)
self.mean_layer = nn.Linear(args.hidden_width, args.action_dim)
self.log_std = nn.Parameter(torch.zeros(1, args.action_dim))# We use 'nn.Parameter' to train log_std automatically
self.activate_func =[nn.ReLU(), nn.Tanh()][args.use_tanh]# Trick10: use tanhif args.use_orthogonal_init:print("------use_orthogonal_init------")
orthogonal_init(self.fc1)
orthogonal_init(self.fc2)
orthogonal_init(self.mean_layer, gain=0.01)defforward(self, s):
s = self.activate_func(self.fc1(s))
s = self.activate_func(self.fc2(s))
mean = self.max_action * torch.tanh(self.mean_layer(s))# [-1,1]->[-max_action,max_action]return mean
defget_dist(self, s):
mean = self.forward(s)
log_std = self.log_std.expand_as(mean)# To make 'log_std' have the same dimension as 'mean'
std = torch.exp(log_std)# The reason we train the 'log_std' is to ensure std=exp(log_std)>0
dist = Normal(mean, std)# Get the Gaussian distributionreturn dist
classCritic(nn.Module):def__init__(self, args):super(Critic, self).__init__()
self.fc1 = nn.Linear(args.state_dim, args.hidden_width)
self.fc2 = nn.Linear(args.hidden_width, args.hidden_width)
self.fc3 = nn.Linear(args.hidden_width,1)
self.activate_func =[nn.ReLU(), nn.Tanh()][args.use_tanh]# Trick10: use tanhif args.use_orthogonal_init:print("------use_orthogonal_init------")
orthogonal_init(self.fc1)
orthogonal_init(self.fc2)
orthogonal_init(self.fc3)defforward(self, s):
s = self.activate_func(self.fc1(s))
s = self.activate_func(self.fc2(s))
v_s = self.fc3(s)return v_s
classPPO_continuous():def__init__(self, args):
self.policy_dist = args.policy_dist
self.max_action = args.max_action
self.batch_size = args.batch_size
self.mini_batch_size = args.mini_batch_size
self.max_train_steps = args.max_train_steps
self.lr_a = args.lr_a # Learning rate of actor
self.lr_c = args.lr_c # Learning rate of critic
self.gamma = args.gamma # Discount factor
self.lamda = args.lamda # GAE parameter
self.epsilon = args.epsilon # PPO clip parameter
self.K_epochs = args.K_epochs # PPO parameter
self.entropy_coef = args.entropy_coef # Entropy coefficient
self.set_adam_eps = args.set_adam_eps
self.use_grad_clip = args.use_grad_clip
self.use_lr_decay = args.use_lr_decay
self.use_adv_norm = args.use_adv_norm
if self.policy_dist =="Beta":
self.actor = Actor_Beta(args)else:
self.actor = Actor_Gaussian(args)
self.critic = Critic(args)if self.set_adam_eps:# Trick 9: set Adam epsilon=1e-5
self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.lr_a, eps=1e-5)
self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.lr_c, eps=1e-5)else:
self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.lr_a)
self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.lr_c)defevaluate(self, s):# When evaluating the policy, we only use the mean
s = torch.unsqueeze(torch.tensor(s, dtype=torch.float),0)if self.policy_dist =="Beta":
a = self.actor.mean(s).detach().numpy().flatten()else:
a = self.actor(s).detach().numpy().flatten()return a
defchoose_action(self, s):
s = torch.unsqueeze(torch.tensor(s, dtype=torch.float),0)if self.policy_dist =="Beta":with torch.no_grad():
dist = self.actor.get_dist(s)
a = dist.sample()# Sample the action according to the probability distribution
a_logprob = dist.log_prob(a)# The log probability density of the actionelse:with torch.no_grad():
dist = self.actor.get_dist(s)
a = dist.sample()# Sample the action according to the probability distribution
a = torch.clamp(a,-self.max_action, self.max_action)# [-max,max]
a_logprob = dist.log_prob(a)# The log probability density of the actionreturn a.numpy().flatten(), a_logprob.numpy().flatten()defupdate(self, replay_buffer, total_steps):
s, a, a_logprob, r, s_, dw, done = replay_buffer.numpy_to_tensor()# Get training data"""
Calculate the advantage using GAE
'dw=True' means dead or win, there is no next state s'
'done=True' represents the terminal of an episode(dead or win or reaching the max_episode_steps). When calculating the adv, if done=True, gae=0
"""
adv =[]
gae =0with torch.no_grad():# adv and v_target have no gradient
vs = self.critic(s)
vs_ = self.critic(s_)
deltas = r + self.gamma *(1.0- dw)* vs_ - vs
for delta, d inzip(reversed(deltas.flatten().numpy()),reversed(done.flatten().numpy())):
gae = delta + self.gamma * self.lamda * gae *(1.0- d)
adv.insert(0, gae)
adv = torch.tensor(adv, dtype=torch.float).view(-1,1)
v_target = adv + vs
if self.use_adv_norm:# Trick 1:advantage normalization
adv =((adv - adv.mean())/(adv.std()+1e-5))# Optimize policy for K epochs:for _ inrange(self.K_epochs):# Random sampling and no repetition. 'False' indicates that training will continue even if the number of samples in the last time is less than mini_batch_sizefor index in BatchSampler(SubsetRandomSampler(range(self.batch_size)), self.mini_batch_size,False):
dist_now = self.actor.get_dist(s[index])
dist_entropy = dist_now.entropy().sum(1, keepdim=True)# shape(mini_batch_size X 1)
a_logprob_now = dist_now.log_prob(a[index])# a/b=exp(log(a)-log(b)) In multi-dimensional continuous action space,we need to sum up the log_prob
ratios = torch.exp(a_logprob_now.sum(1, keepdim=True)- a_logprob[index].sum(1, keepdim=True))# shape(mini_batch_size X 1)
surr1 = ratios * adv[index]# Only calculate the gradient of 'a_logprob_now' in ratios
surr2 = torch.clamp(ratios,1- self.epsilon,1+ self.epsilon)* adv[index]
actor_loss =-torch.min(surr1, surr2)- self.entropy_coef * dist_entropy # Trick 5: policy entropy# Update actor
self.optimizer_actor.zero_grad()
actor_loss.mean().backward()if self.use_grad_clip:# Trick 7: Gradient clip
torch.nn.utils.clip_grad_norm_(self.actor.parameters(),0.5)
self.optimizer_actor.step()
v_s = self.critic(s[index])
critic_loss = F.mse_loss(v_target[index], v_s)# Update critic
self.optimizer_critic.zero_grad()
critic_loss.backward()if self.use_grad_clip:# Trick 7: Gradient clip
torch.nn.utils.clip_grad_norm_(self.critic.parameters(),0.5)
self.optimizer_critic.step()if self.use_lr_decay:# Trick 6:learning rate Decay
self.lr_decay(total_steps)deflr_decay(self, total_steps):
lr_a_now = self.lr_a *(1- total_steps / self.max_train_steps)
lr_c_now = self.lr_c *(1- total_steps / self.max_train_steps)for p in self.optimizer_actor.param_groups:
p['lr']= lr_a_now
for p in self.optimizer_critic.param_groups:
p['lr']= lr_c_now
- 下面代码用来作为训练和测试网络的代码CPPO_main.py的代码:
import matplotlib.pyplot as plt
import numpy as np
from normalization import Normalization, RewardScaling
from replaybuffer import ReplayBuffer
from ppo_continuous import PPO_continuous
from Fighter_env import Fighter
# 先定义一个参数类,用来储存超参数classargs_param(object):def__init__(self,max_train_steps=int(3e6),
evaluate_freq=5e3,
save_freq=20,
policy_dist="Gaussian",
batch_size=2048,
mini_batch_size=64,
hidden_width=64,
lr_a=3e-4,
lr_c=3e-4,
gamma=0.99,
lamda=0.95,
epsilon=0.2,
K_epochs=10,
max_episode_steps =1000,
use_adv_norm=True,
use_state_norm=True,
use_reward_norm=False,
use_reward_scaling=True,
entropy_coef=0.01,
use_lr_decay=True,
use_grad_clip=True,
use_orthogonal_init=True,
set_adam_eps=True,
use_tanh=True):
self.max_train_steps = max_train_steps
self.evaluate_freq = evaluate_freq
self.save_freq = save_freq
self.policy_dist = policy_dist
self.batch_size = batch_size
self.mini_batch_size = mini_batch_size
self.hidden_width = hidden_width
self.lr_a = lr_a
self.lr_c = lr_c
self.gamma = gamma
self.lamda = lamda
self.epsilon = epsilon
self.K_epochs = K_epochs
self.use_adv_norm = use_adv_norm
self.use_state_norm = use_state_norm
self.use_reward_norm = use_reward_norm
self.use_reward_scaling = use_reward_scaling
self.entropy_coef = entropy_coef
self.use_lr_decay = use_lr_decay
self.use_grad_clip = use_grad_clip
self.use_orthogonal_init = use_orthogonal_init
self.set_adam_eps = set_adam_eps
self.use_tanh = use_tanh
self.max_episode_steps = max_episode_steps
defprint_information(self):print("Maximum number of training steps:",self.max_train_steps)print("Evaluate the policy every 'evaluate_freq' steps:",self.evaluate_freq)print("Save frequency:",self.save_freq)print("Beta or Gaussian:",self.policy_dist)print("Batch size:",self.batch_size)print("Minibatch size:",self.mini_batch_size)print("The number of neurons in hidden layers of the neural network:",self.hidden_width)print("Learning rate of actor:",self.lr_a)print("Learning rate of critic:",self.lr_c)print("Discount factor:",self.gamma)print("GAE parameter:",self.lamda)print("PPO clip parameter:",self.epsilon)print("PPO parameter:",self.K_epochs)print("Trick 1:advantage normalization:",self.use_adv_norm)print("Trick 2:state normalization:",self.use_state_norm)print("Trick 3:reward normalization:",self.use_reward_norm)print("Trick 4:reward scaling:",self.use_reward_scaling)print("Trick 5: policy entropy:",self.entropy_coef)print("Trick 6:learning rate Decay:",self.use_lr_decay)print("Trick 7: Gradient clip:",self.use_grad_clip)print("Trick 8: orthogonal initialization:",self.use_orthogonal_init)print("Trick 9: set Adam epsilon=1e-5:",self.set_adam_eps)print("Trick 10: tanh activation function:",self.use_tanh)# 下面函数用来训练网络deftrain_network(args,env,show_picture=True):
epsiode_rewards =[]
epsiode_mean_rewards =[]# 下面将导入env环境参数
args.state_dim = env.observation_space.shape[0]
args.action_dim = env.action_space.shape[0]
args.max_action =float(env.action_space[0][1])# 下面将定义一个缓冲区
replay_buffer = ReplayBuffer(args)# 下面将定义一个PPO智能体类
agent = PPO_continuous(args)# 下面将采用Trick 2技巧标准化
state_norm = Normalization(shape=args.state_dim)if args.use_reward_norm:
reward_norm = Normalization(shape=1)elif args.use_reward_scaling:# Trick 4:reward scaling
reward_scaling = RewardScaling(shape=1, gamma=args.gamma)# 下面开始进行训练过程for epsiode inrange(args.max_train_steps):# 每个回合首先对值进行初始化
epsiode_reward =0.0
done =False
epsiode_count =0# 再赋予一个新的初始状态
s = env.reset()# 对状态进行标准化操作if args.use_state_norm:
s = state_norm(s)if args.use_reward_scaling:
reward_scaling.reset()# 设置一个死循环,后面若跳出便在死循环中跳出whileTrue:# 每执行一个回合,count次数加1
epsiode_count +=1
a,a_logprob = agent.choose_action(s)# 根据参数的不同选择输出是高斯分布/Beta分布调整if args.policy_dist =="Beta":
action =2*(a-0.5)*args.max_action
else:
action = a
# 下面是执行环境交互操作
s_,r,done = env.step(action)## !!! 这里的环境是自己搭建的,输出每个人都不一样
s_ = s_.T[0]## !!! 这一部起始没必要,由于我的环境有问题所以加的这一步
epsiode_reward += r
# 下面考虑状态标准化的情况if args.use_state_norm:
s_ = state_norm(s_)# 下面考虑奖励标准化的可能性if args.use_reward_norm:
r = reward_norm(r)elif args.use_reward_scaling:
r = reward_scaling(r)# 下面考虑回合的最大运行次数(只要回合结束或者超过最大回合运行次数)if done or epsiode_count >= args.max_episode_steps:
dw =Trueelse:
dw =False# 将经验存入replayBuffer中
replay_buffer.store(s,action,a_logprob,r,s_,dw,done)# 重新赋值状态
s = s_
# 当replaybuffer尺寸到达batchsize便会开始训练if replay_buffer.count == args.batch_size:
agent.update(replay_buffer,epsiode)
replay_buffer.count =0# 如果回合结束便退出if done:
epsiode_rewards.append(epsiode_reward)
epsiode_mean_rewards.append(np.mean(epsiode_rewards))print("第{}次训练的奖励为{:.2f},平均奖励为{:.2f}".format(epsiode,
epsiode_reward, epsiode_mean_rewards[-1]))break# 如果需要画图的话if show_picture:
plt.plot(epsiode_rewards)
plt.plot(epsiode_mean_rewards)
plt.xlabel("epsiodes")
plt.ylabel("rewards")
plt.title("Continuous PPO With Optimization")
plt.legend(["rewards","mean_rewards"])
plt.show()return agent
# 下面将用训练好的网络跑一次例程deftest_network(args,env,agent,show_pictures=True):
epsiode_reward =0.0
done =False
epsiode_count =0
state_norm = Normalization(shape=args.state_dim)# 再赋予一个新的初始状态
s = env.reset()
x_array, y_array =[s[0]],[s[1]]if args.use_reward_norm:
reward_norm = Normalization(shape=1)
s = state_norm(s)elif args.use_reward_scaling:# Trick 4:reward scaling
reward_scaling = RewardScaling(shape=1, gamma=args.gamma)
reward_scaling.reset()# 设置一个死循环,后面若跳出便在死循环中跳出whileTrue:# 每执行一个回合,count次数加1
epsiode_count +=1
a, a_logprob = agent.choose_action(s)# 根据参数的不同选择输出是高斯分布/Beta分布调整if args.policy_dist =="Beta":
action =2*(a -0.5)* args.max_action
else:
action = a
# 下面是执行环境交互操作
s_, r, done = env.step(action)## !!! 这里的环境是自己搭建的,输出每个人都不一样
epsiode_reward += r
s_ = s_.T[0]## !!! 这一部起始没必要,由于我的环境有问题所以加的这一步
x_array.append(s_[0])
y_array.append(s_[1])
epsiode_reward += r
# 下面考虑状态标准化的情况if args.use_state_norm:
s_ = state_norm(s_)# 重新赋值状态
s = s_
# 如果回合结束便退出if done or epsiode_count>=2000:print("当前测试得分为{}".format(epsiode_reward))break# 下面开始画图if show_pictures:
circle_x, circle_y =[],[]for theta in np.linspace(-np.pi, np.pi):
circle_x.append(env.SAM_x + env.SAM_range * np.cos(theta))
circle_y.append(env.SAM_y + env.SAM_range * np.sin(theta))
plt.plot(x_array, y_array,'b.-', linewidth=1)
plt.plot(env.SAM_x, env.SAM_y,'ro', linewidth=5)
plt.plot(circle_x, circle_y,'r.-', linewidth=1)
plt.plot(env.target_x, env.target_y,'go', linewidth=5)
plt.plot(x_array[-1], y_array[-1],'m*', linewidth=5)
plt.title("one road rewards:{:.2f}".format(epsiode_reward))
plt.show()return x_array,y_array,epsiode_reward
# 下面是主函数if __name__ =="__main__":# 声明环境
args = args_param(max_episode_steps=1000,batch_size=64,max_train_steps=1000,K_epochs=3)# 声明参数
env = Fighter(delta_t=25,SAM_range=30,Fighter_range=10,SAM_time=30)
env.test(show_pictures=True,show_tips=False)
agent = train_network(args,env)
x_array,y_array,_ = test_network(args,env,agent)
版权归原作者 赛亚茂 所有, 如有侵权,请联系我们删除。