0


基于强化学习的空战辅助决策(2D)

1.基本场景

不同与之前的1D问题,这次任务将问题拓展到2D上,如下图只考虑三个作战单元:蓝方战斗机Fighter、红方目标Target、红方导弹防御系统SAM。

在这里插入图片描述

  • 目的:是训练Fighter使其能学到花费最小的时间代价使得Fighter绕过SAM的导弹火力范围,避免火力打击从而直接攻击并击毁Target的启发式原则。
  • 限制:保证Fighter、SAM、Target在场景中的初始位置随机,但基本要使得其相对方位满足上图所示的关系。下表是三者之间的初始位置以及Fighter与SAM之间的火力范围:

在这里插入图片描述

  • 控制:在每个时刻的Fighter的下一个航向(the fighter’s next heading),是一个连续型控制变量 [ 0 , 2 π ] [0,2\pi] [0,2π],是唯一的动作变量。
  • 假设:在此假设Agent只能根据当前状态 s t s_t st​下的每个时间点处的战斗机理想位置采取动作 a t a_t at​,而与之前的Agent的状态 s i ( i = 0 , 1 , . . t − 1 ) s_{i}(i=0,1,..t-1) si​(i=0,1,..t−1)无关。并且在原论文中,为了限制状态空间的尺寸,而将Fighter的速度设置成常量。而在我们这里,将在扩展部分考虑将Fighter的加速度 a t a_t at​也纳入动作空间。同时SAM系统具有部分全向检测的能力,即对于只落在其射程内的Fighter都具有检测能力,而Fighter则对于SAM系统的状态则是全可观测的。
  • 基本平台:原论文中采用的是AFGYM/AFSIM是基本平台。求解算法不再使用Q-Learning,由于其相比与策略梯度方法而言在求解高维状态空间的问题上不具备优势,其在使用带Reinforce的Q-Learning训练行为时,最后的结果不太理想,就像随机游走一样。并且由于决策的实时性导致GAN也是不太适用的,论文采用了A3C和PPO算法对原来问题做了训练。

2.基本模型

2.1 状态空间与动作空间

将原来场景中的状态向量

  1. s
  2. t
  3. s_t
  4. st​由以下的11维向量组成:

在这里插入图片描述

注意:训练过程中除了0-1变量,所有的状态向量的值都被归一化了,否则将限制梯度的反向传播过程。在后面的编程中暂时不考虑SAM pointing、Target heading,因此状态空间被压缩到9维。这个可以作为后面改进的一个具体的方向。其中的Fighter heading作为一个基本的动作

  1. a
  2. t
  3. a_t
  4. at​。

因此最终状态

  1. s
  2. t
  3. s_t
  4. st​的状态空间:
  5. S
  6. =
  7. [
  8. 0
  9. ,
  10. 1
  11. ]
  12. 8
  13. S=[0,1]^{8}
  14. S=[0,1]8,而动作
  15. a
  16. t
  17. a_t
  18. at​的动作空间:
  19. A
  20. =
  21. [
  22. 0
  23. ,
  24. 1
  25. ]
  26. A=[0,1]
  27. A=[0,1]映射到角度范围
  28. [
  29. 18
  30. 0
  31. ,
  32. 18
  33. 0
  34. ]
  35. [-180^{。},180^{。}]
  36. [−180。,180。]。

2.2 奖励函数

每当无人机飞行一个时间步长

  1. Δ
  2. t
  3. \Delta t
  4. Δt,若未出现在SAM的火力射程之内,则返回奖励-1,回合继续。若出现在SAM的火力射程之内则返回奖励-50,回合继续。若出现在SAM的火力射程之内且被SAM击落,则返回-1000,回合结束。若出现在Target 附近返回50,回合继续。若出现在Target附近且成功打击到Target,返回1000,回合结束。

2.3 状态转移方程

模型只考虑Fighter匀速运动的情况,后面可以考虑施加突防效果,以此作为一个突破点的研究方向(2)。

假设Fighter的速度恒定为

  1. v
  2. v
  3. v
  4. t
  5. t
  6. t时刻Fighter的航向为
  7. a
  8. t
  9. a_t
  10. at​(动作变量),Fighter的位置为
  11. (
  12. x
  13. J
  14. (
  15. t
  16. )
  17. ,
  18. y
  19. J
  20. (
  21. t
  22. )
  23. )
  24. (x_J^{(t)},y_J^{(t)})
  25. (xJ(t)​,yJ(t)​),则有:
  26. x
  27. J
  28. (
  29. t
  30. +
  31. 1
  32. )
  33. =
  34. x
  35. J
  36. (
  37. t
  38. )
  39. +
  40. v
  41. Δ
  42. t
  43. cos
  44. a
  45. t
  46. y
  47. J
  48. (
  49. t
  50. +
  51. 1
  52. )
  53. =
  54. y
  55. J
  56. (
  57. t
  58. )
  59. +
  60. v
  61. Δ
  62. t
  63. sin
  64. a
  65. t
  66. x_J^{(t+1)}=x_J^{(t)}+v\Delta t \cos a_t \\ y_J^{(t+1)}=y_J^{(t)}+v\Delta t \sin a_t
  67. xJ(t+1)​=xJ(t)​+vΔtcosatyJ(t+1)​=yJ(t)​+vΔtsinat

Fighter的弹药量

  1. w
  2. t
  3. w_{t}
  4. wt​,如果
  5. t
  6. t
  7. t时刻Fighter打击目标则
  8. w
  9. t
  10. +
  11. 1
  12. =
  13. w
  14. t
  15. d
  16. w
  17. t
  18. w_{t+1}=w_t-dw_t
  19. wt+1​=wt​−dwt​。在此处我们不考虑弹药量的损失,考虑充分的弹药量,这里也可以作为一个突破点方向(3)。设SAM的火力范围区域为
  20. R
  21. R
  22. R,在这里考虑
  23. R
  24. R
  25. R为以SAM为中心,
  26. R
  27. S
  28. A
  29. M
  30. R_{SAM}
  31. RSAM​为半径的圆形,其中
  32. R
  33. S
  34. A
  35. M
  36. R_{SAM}
  37. RSAM​为Fighter的火力杀伤范围。

3.环境搭建

下面将直接给出代码:

  1. classFighter:def__init__(self,L_limits=100,x=0,y=0,SAM_x=50,SAM_y=50,
  2. target_x =70,target_y=80,velocity=2175.0/3600,
  3. delta_t=10,dead=False,SAM_range=40,SAM_time=60,
  4. Fighter_range=10,firing_range=3.5,weapon_count=20):
  5. self.L_limits = L_limits # 限制范围
  6. self.x = x # 战斗机经度
  7. self.y = y # 战斗机纬度
  8. self.SAM_x = SAM_x
  9. self.SAM_y = SAM_y
  10. self.target_x = target_x
  11. self.target_y = target_y
  12. # 上面是位置的基本属性
  13. self.SAM_disable =False
  14. self.SAM_time = SAM_time # SAM系统的预备时间
  15. self.SAM_range = SAM_range
  16. self.firing_range = firing_range # 战斗机火力范围
  17. self.weapon_count = weapon_count # 战斗机弹药容量
  18. self.velocity = velocity
  19. self.delta_t = delta_t
  20. self.dead = dead
  21. self.wait_time =0
  22. self.burn_reward =-1000
  23. self.win_reward =1000
  24. self.reward =-1# 战斗机正常运行# 下面对战斗机的初始位置进行更新defreset(self):
  25. self.x = self.L_limits*np.random.random()
  26. self.y = self.L_limits*np.random.random()
  27. self.dead =False
  28. self.wait_time =0
  29. self.SAM_disable =Falsereturn self.x,self.y
  30. # 下面是状态转移方程defstep(self,figher_heading):# 下面采用弧度制
  31. figher_heading = np.clip(figher_heading,-np.pi,np.pi)
  32. self.x = self.x + self.velocity * self.delta_t*np.cos(figher_heading)
  33. self.y = self.y + self.velocity * self.delta_t*np.sin(figher_heading)# 限制战斗机的横向距离if self.x <=0:
  34. self.x =0elif self.x > self.L_limits:
  35. self.x = self.L_limits
  36. # 限制战斗机的纵向位置if self.y <=0:
  37. self.y =0elif self.y > self.L_limits:
  38. self.y = self.L_limits
  39. # 判断战斗机是否在区域内if self.SAM_disable ==False:
  40. d = np.sqrt((self.x - self.SAM_x)**2+(self.y - self.SAM_y)**2)if d <= self.SAM_range:
  41. self.wait_time = self.wait_time + self.delta_t
  42. else:# 一旦不在其SAM射程之内
  43. self.wait_time =0# 判断战斗机的等待时间if self.wait_time >= self.SAM_time:#当战斗机在SAM的时间内停留一段时间# 此时战斗机会被击落,返回极大奖励
  44. self.wait_time =0return(self.x,self.y),self.burn_reward,True# 当战斗机靠近SAM时可以立即击毁SAM(这是一个关键节点给50的奖励)if d <= self.firing_range and self.SAM_disable ==False:
  45. self.SAM_disable =True
  46. self.wait_time =0return(self.x,self.y),50,False# 当战斗机靠近target时if np.sqrt((self.target_x - self.x)**2+(self.target_y - self.y)**2)<= self.firing_range:return(self.x,self.y),self.win_reward,True# 战斗机正常运行时候,返回-1return(self.x,self.y),-1,False## 强化学习或许可以结合故障树进行策略分析!!!

下面利用随机游走给出测试代码:

  1. x_array,y_array =[],[]
  2. rewards =0.0
  3. SAM_x,SAM_y =50,50
  4. target_x,target_y =80,70
  5. SAM_range =20
  6. fighter = Fighter(SAM_x=SAM_x,SAM_y=SAM_y,target_x=target_x,target_y=target_y,SAM_range=SAM_range)whileTrue:
  7. fighter_heading = np.random.random()*2*np.pi-np.pi
  8. next_state,reward,done = fighter.step(fighter_heading)
  9. rewards += reward
  10. print("当前Figher位置[{},{}],获得奖励{},累计奖励{}".format(next_state[0],next_state[1],reward,rewards))
  11. x_array.append(next_state[0])
  12. y_array.append(next_state[1])if done:breakimport matplotlib.pyplot as plt
  13. # 以火力射程画圆
  14. circle_x,circle_y =[],[]for theta in np.linspace(-np.pi,np.pi):
  15. circle_x.append(SAM_x + SAM_range*np.cos(theta))
  16. circle_y.append(SAM_y + SAM_range*np.sin(theta))
  17. plt.figure(figsize=(5,5))
  18. plt.plot(x_array,y_array,'b.-',linewidth=1)
  19. plt.plot(SAM_x,SAM_y,'ro',linewidth=5)
  20. plt.plot(circle_x,circle_y,'r.-',linewidth=1)
  21. plt.plot(target_x,target_y,'go',linewidth=5)
  22. plt.title(['one road'])

测试结果为:
以下情况是被SAM击落的:
在这里插入图片描述
以下情况时成功绕开SAM的:
在这里插入图片描述

2.实现

采用带优化的连续PPO算法求解上面的问题,需要导入库:

  1. import torch
  2. import matplotlib.pyplot as plt
  3. import numpy as np
  4. from normalization import Normalization, RewardScaling
  5. from replaybuffer import ReplayBuffer
  6. from ppo_continuous import PPO_continuous
  7. from Fighter_env import Fighter
  8. from CPPO_main import args_param,train_network,test_network

采用下面的主函数求解:

  1. # 声明环境
  2. args = args_param(max_episode_steps=2000,batch_size=64,max_train_steps=5000,K_epochs=3)# 声明参数
  3. env = Fighter(delta_t=25,SAM_range=30,Fighter_range=10,SAM_time=30)
  4. env.test(show_pictures=True,show_tips=False)
  5. agent = train_network(args,env)
  6. x_array,y_array,_ = test_network(args,env,agent)

得到以下平均奖励的收敛曲线:
在这里插入图片描述
得到如下的测试结果:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
看来用连续动作空间的PPO训练网络还是问题太大了!!!

3.代码

  • 在normalization.py中保存以下代码作为标准化:
  1. import numpy as np
  2. classRunningMeanStd:# Dynamically calculate mean and stddef__init__(self, shape):# shape:the dimension of input data
  3. self.n =0
  4. self.mean = np.zeros(shape)
  5. self.S = np.zeros(shape)
  6. self.std = np.sqrt(self.S)defupdate(self, x):
  7. x = np.array(x)
  8. self.n +=1if self.n ==1:
  9. self.mean = x
  10. self.std = x
  11. else:
  12. old_mean = self.mean.copy()
  13. self.mean = old_mean +(x - old_mean)/ self.n
  14. self.S = self.S +(x - old_mean)*(x - self.mean)
  15. self.std = np.sqrt(self.S / self.n)classNormalization:def__init__(self, shape):
  16. self.running_ms = RunningMeanStd(shape=shape)def__call__(self, x, update=True):# Whether to update the mean and std,during the evaluating,update=Falseif update:
  17. self.running_ms.update(x)
  18. x =(x - self.running_ms.mean)/(self.running_ms.std +1e-8)return x
  19. classRewardScaling:def__init__(self, shape, gamma):
  20. self.shape = shape # reward shape=1
  21. self.gamma = gamma # discount factor
  22. self.running_ms = RunningMeanStd(shape=self.shape)
  23. self.R = np.zeros(self.shape)def__call__(self, x):
  24. self.R = self.gamma * self.R + x
  25. self.running_ms.update(self.R)
  26. x = x /(self.running_ms.std +1e-8)# Only divided stdreturn x
  27. defreset(self):# When an episode is done,we should reset 'self.R'
  28. self.R = np.zeros(self.shape)
  • 在Fighter_env.py中写入以下代码,用来作为定战斗机环境:
  1. import numpy as np
  2. import matplotlib.pyplot as plt
  3. # 下面类用来定义环境classFighter:def__init__(self,L_limits=100,x=0,y=0,SAM_x=50,SAM_y=50,
  4. target_x =70,target_y=80,velocity=2175.0/3600,
  5. delta_t=10,dead=False,SAM_range=40,SAM_time=60,
  6. Fighter_range=10,weapon_count=20):
  7. self.L_limits = L_limits # 限制范围
  8. self.x = x # 战斗机经度
  9. self.y = y # 战斗机纬度
  10. self.SAM_x = SAM_x
  11. self.SAM_y = SAM_y
  12. self.target_x = target_x
  13. self.target_y = target_y
  14. # 上面是位置的基本属性
  15. self.SAM_disable =False
  16. self.SAM_time = SAM_time # SAM系统的预备时间
  17. self.SAM_range = SAM_range
  18. self.firing_range = Fighter_range # 战斗机火力范围
  19. self.weapon_count = weapon_count # 战斗机弹药容量
  20. self.velocity = velocity
  21. self.delta_t = delta_t
  22. self.dead = dead
  23. self.wait_time =0
  24. self.burn_reward =-1000
  25. self.win_reward =1000
  26. self.reward =-1# 战斗机正常运行# 下面声明其基本属性
  27. self.observation_space = np.array([[0,self.L_limits],[0,self.L_limits]])
  28. self.action_space = np.array([[-np.pi,np.pi]])# 下面对战斗机的初始位置进行更新defreset(self):# self.x = self.L_limits*np.random.random()# self.y = self.L_limits*np.random.random()
  29. self.x =0
  30. self.y =0
  31. self.dead =False
  32. self.wait_time =0
  33. self.SAM_disable =Falsereturn np.array([self.x,self.y],dtype=np.float32)# 下面是状态转移方程defstep(self,figher_heading):# 下面采用弧度制
  34. figher_heading = np.clip(figher_heading,-np.pi,np.pi)
  35. self.x = self.x + self.velocity * self.delta_t*np.cos(figher_heading)
  36. self.y = self.y + self.velocity * self.delta_t*np.sin(figher_heading)
  37. self.x = np.clip(self.x,a_min=0.0,a_max=self.L_limits)
  38. self.y = np.clip(self.y,a_min=0.0,a_max=self.L_limits)# 当战斗机靠近targetif np.linalg.norm(np.array([self.target_x - self.x, self.target_y - self.y], dtype=np.float32))<= self.firing_range:return np.array([self.x, self.y], dtype=np.float32), self.win_reward,True# 判断战斗机是否在区域内
  39. d = np.linalg.norm(np.array([self.x - self.SAM_x, self.y - self.SAM_y], dtype=np.float32))if self.SAM_disable ==False:if d <= self.SAM_range:
  40. self.wait_time = self.wait_time + self.delta_t
  41. else:# 一旦不在其SAM射程之内
  42. self.wait_time =0# 判断战斗机的等待时间if self.wait_time >= self.SAM_time:#当战斗机在SAM的时间内停留一段时间# 此时战斗机会被击落,返回极大奖励
  43. self.wait_time =0return np.array([self.x,self.y],dtype=np.float32),self.burn_reward,True# 当战斗机靠近SAM时可以立即击毁SAM(这是一个关键节点给50的奖励)if d <= self.firing_range and self.SAM_disable ==False:
  44. self.SAM_disable =True
  45. self.wait_time =0return np.array([self.x,self.y],dtype=np.float32),50,False# 战斗机正常运行时候,返回-1return np.array([self.x,self.y],dtype=np.float32),-1,False## 强化学习或许可以结合故障树进行策略分析!!!# 下面将测试一个回合deftest(self,show_tips=True,show_pictures=True):
  46. x_array,y_array=[],[]
  47. rewards =0.0whileTrue:
  48. fighter_headings = np.random.random()*2*np.pi - np.pi
  49. next_state,reward,done = self.step(fighter_headings)
  50. rewards += reward
  51. if show_tips:print("当前Figher位置[{},{}],获得奖励{},累计奖励{}".format(next_state[0], next_state[1], reward, rewards))
  52. x_array.append(next_state[0])
  53. y_array.append(next_state[1])if done:break# 如果要画图就以火力范围画图if show_pictures:
  54. circle_x, circle_y =[],[]for theta in np.linspace(-np.pi, np.pi):
  55. circle_x.append(self.SAM_x + self.SAM_range * np.cos(theta))
  56. circle_y.append(self.SAM_y + self.SAM_range * np.sin(theta))
  57. plt.figure(figsize=(5,5))
  58. plt.plot(x_array, y_array,'b.-', linewidth=1)
  59. plt.plot(self.SAM_x, self.SAM_y,'ro', linewidth=5)
  60. plt.plot(circle_x, circle_y,'r.-', linewidth=1)
  61. plt.plot(self.target_x, self.target_y,'go', linewidth=5)
  62. plt.plot(x_array[-1],y_array[-1],'m*',linewidth=5)
  63. plt.title(['one road'])
  64. plt.show()

在replaybuffer.py中存储缓存区replaybuffer类:

  1. def__init__(self, args):
  2. self.state_dim = args.state_dim
  3. self.action_dim = args.action_dim
  4. self.batch_size = args.batch_size
  5. self.s = np.zeros((args.batch_size, args.state_dim))
  6. self.a = np.zeros((args.batch_size, args.action_dim))
  7. self.a_logprob = np.zeros((args.batch_size, args.action_dim))
  8. self.r = np.zeros((args.batch_size,1))
  9. self.s_ = np.zeros((args.batch_size, args.state_dim))
  10. self.dw = np.zeros((args.batch_size,1))
  11. self.done = np.zeros((args.batch_size,1))
  12. self.count =0defstore(self, s, a, a_logprob, r, s_, dw, done):
  13. self.s[self.count]= s
  14. self.a[self.count]= a
  15. self.a_logprob[self.count]= a_logprob
  16. self.r[self.count]= r
  17. self.s_[self.count]= s_
  18. self.dw[self.count]= dw
  19. self.done[self.count]= done
  20. self.count +=1defnumpy_to_tensor(self):
  21. s = torch.tensor(self.s, dtype=torch.float)
  22. a = torch.tensor(self.a, dtype=torch.float)
  23. a_logprob = torch.tensor(self.a_logprob, dtype=torch.float)
  24. r = torch.tensor(self.r, dtype=torch.float)
  25. s_ = torch.tensor(self.s_, dtype=torch.float)
  26. dw = torch.tensor(self.dw, dtype=torch.float)
  27. done = torch.tensor(self.done, dtype=torch.float)return s, a, a_logprob, r, s_, dw, done
  • 在以下文件ppo_continuous.py中包含带优化的连续PPO算法的代码:
  1. import torch
  2. import torch.nn.functional as F
  3. from torch.utils.data.sampler import BatchSampler, SubsetRandomSampler
  4. import torch.nn as nn
  5. from torch.distributions import Beta, Normal
  6. # Trick 8: orthogonal initializationdeforthogonal_init(layer, gain=1.0):
  7. nn.init.orthogonal_(layer.weight, gain=gain)
  8. nn.init.constant_(layer.bias,0)classActor_Beta(nn.Module):def__init__(self, args):super(Actor_Beta, self).__init__()
  9. self.fc1 = nn.Linear(args.state_dim, args.hidden_width)
  10. self.fc2 = nn.Linear(args.hidden_width, args.hidden_width)
  11. self.alpha_layer = nn.Linear(args.hidden_width, args.action_dim)
  12. self.beta_layer = nn.Linear(args.hidden_width, args.action_dim)
  13. self.activate_func =[nn.ReLU(), nn.Tanh()][args.use_tanh]# Trick10: use tanhif args.use_orthogonal_init:print("------use_orthogonal_init------")
  14. orthogonal_init(self.fc1)
  15. orthogonal_init(self.fc2)
  16. orthogonal_init(self.alpha_layer, gain=0.01)
  17. orthogonal_init(self.beta_layer, gain=0.01)defforward(self, s):
  18. s = self.activate_func(self.fc1(s))
  19. s = self.activate_func(self.fc2(s))# alpha and beta need to be larger than 1,so we use 'softplus' as the activation function and then plus 1
  20. alpha = F.softplus(self.alpha_layer(s))+1.0
  21. beta = F.softplus(self.beta_layer(s))+1.0return alpha, beta
  22. defget_dist(self, s):
  23. alpha, beta = self.forward(s)
  24. dist = Beta(alpha, beta)return dist
  25. defmean(self, s):
  26. alpha, beta = self.forward(s)
  27. mean = alpha /(alpha + beta)# The mean of the beta distributionreturn mean
  28. classActor_Gaussian(nn.Module):def__init__(self, args):super(Actor_Gaussian, self).__init__()
  29. self.max_action = args.max_action
  30. self.fc1 = nn.Linear(args.state_dim, args.hidden_width)
  31. self.fc2 = nn.Linear(args.hidden_width, args.hidden_width)
  32. self.mean_layer = nn.Linear(args.hidden_width, args.action_dim)
  33. self.log_std = nn.Parameter(torch.zeros(1, args.action_dim))# We use 'nn.Parameter' to train log_std automatically
  34. self.activate_func =[nn.ReLU(), nn.Tanh()][args.use_tanh]# Trick10: use tanhif args.use_orthogonal_init:print("------use_orthogonal_init------")
  35. orthogonal_init(self.fc1)
  36. orthogonal_init(self.fc2)
  37. orthogonal_init(self.mean_layer, gain=0.01)defforward(self, s):
  38. s = self.activate_func(self.fc1(s))
  39. s = self.activate_func(self.fc2(s))
  40. mean = self.max_action * torch.tanh(self.mean_layer(s))# [-1,1]->[-max_action,max_action]return mean
  41. defget_dist(self, s):
  42. mean = self.forward(s)
  43. log_std = self.log_std.expand_as(mean)# To make 'log_std' have the same dimension as 'mean'
  44. std = torch.exp(log_std)# The reason we train the 'log_std' is to ensure std=exp(log_std)>0
  45. dist = Normal(mean, std)# Get the Gaussian distributionreturn dist
  46. classCritic(nn.Module):def__init__(self, args):super(Critic, self).__init__()
  47. self.fc1 = nn.Linear(args.state_dim, args.hidden_width)
  48. self.fc2 = nn.Linear(args.hidden_width, args.hidden_width)
  49. self.fc3 = nn.Linear(args.hidden_width,1)
  50. self.activate_func =[nn.ReLU(), nn.Tanh()][args.use_tanh]# Trick10: use tanhif args.use_orthogonal_init:print("------use_orthogonal_init------")
  51. orthogonal_init(self.fc1)
  52. orthogonal_init(self.fc2)
  53. orthogonal_init(self.fc3)defforward(self, s):
  54. s = self.activate_func(self.fc1(s))
  55. s = self.activate_func(self.fc2(s))
  56. v_s = self.fc3(s)return v_s
  57. classPPO_continuous():def__init__(self, args):
  58. self.policy_dist = args.policy_dist
  59. self.max_action = args.max_action
  60. self.batch_size = args.batch_size
  61. self.mini_batch_size = args.mini_batch_size
  62. self.max_train_steps = args.max_train_steps
  63. self.lr_a = args.lr_a # Learning rate of actor
  64. self.lr_c = args.lr_c # Learning rate of critic
  65. self.gamma = args.gamma # Discount factor
  66. self.lamda = args.lamda # GAE parameter
  67. self.epsilon = args.epsilon # PPO clip parameter
  68. self.K_epochs = args.K_epochs # PPO parameter
  69. self.entropy_coef = args.entropy_coef # Entropy coefficient
  70. self.set_adam_eps = args.set_adam_eps
  71. self.use_grad_clip = args.use_grad_clip
  72. self.use_lr_decay = args.use_lr_decay
  73. self.use_adv_norm = args.use_adv_norm
  74. if self.policy_dist =="Beta":
  75. self.actor = Actor_Beta(args)else:
  76. self.actor = Actor_Gaussian(args)
  77. self.critic = Critic(args)if self.set_adam_eps:# Trick 9: set Adam epsilon=1e-5
  78. self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.lr_a, eps=1e-5)
  79. self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.lr_c, eps=1e-5)else:
  80. self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.lr_a)
  81. self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.lr_c)defevaluate(self, s):# When evaluating the policy, we only use the mean
  82. s = torch.unsqueeze(torch.tensor(s, dtype=torch.float),0)if self.policy_dist =="Beta":
  83. a = self.actor.mean(s).detach().numpy().flatten()else:
  84. a = self.actor(s).detach().numpy().flatten()return a
  85. defchoose_action(self, s):
  86. s = torch.unsqueeze(torch.tensor(s, dtype=torch.float),0)if self.policy_dist =="Beta":with torch.no_grad():
  87. dist = self.actor.get_dist(s)
  88. a = dist.sample()# Sample the action according to the probability distribution
  89. a_logprob = dist.log_prob(a)# The log probability density of the actionelse:with torch.no_grad():
  90. dist = self.actor.get_dist(s)
  91. a = dist.sample()# Sample the action according to the probability distribution
  92. a = torch.clamp(a,-self.max_action, self.max_action)# [-max,max]
  93. a_logprob = dist.log_prob(a)# The log probability density of the actionreturn a.numpy().flatten(), a_logprob.numpy().flatten()defupdate(self, replay_buffer, total_steps):
  94. s, a, a_logprob, r, s_, dw, done = replay_buffer.numpy_to_tensor()# Get training data"""
  95. Calculate the advantage using GAE
  96. 'dw=True' means dead or win, there is no next state s'
  97. 'done=True' represents the terminal of an episode(dead or win or reaching the max_episode_steps). When calculating the adv, if done=True, gae=0
  98. """
  99. adv =[]
  100. gae =0with torch.no_grad():# adv and v_target have no gradient
  101. vs = self.critic(s)
  102. vs_ = self.critic(s_)
  103. deltas = r + self.gamma *(1.0- dw)* vs_ - vs
  104. for delta, d inzip(reversed(deltas.flatten().numpy()),reversed(done.flatten().numpy())):
  105. gae = delta + self.gamma * self.lamda * gae *(1.0- d)
  106. adv.insert(0, gae)
  107. adv = torch.tensor(adv, dtype=torch.float).view(-1,1)
  108. v_target = adv + vs
  109. if self.use_adv_norm:# Trick 1:advantage normalization
  110. adv =((adv - adv.mean())/(adv.std()+1e-5))# Optimize policy for K epochs:for _ inrange(self.K_epochs):# Random sampling and no repetition. 'False' indicates that training will continue even if the number of samples in the last time is less than mini_batch_sizefor index in BatchSampler(SubsetRandomSampler(range(self.batch_size)), self.mini_batch_size,False):
  111. dist_now = self.actor.get_dist(s[index])
  112. dist_entropy = dist_now.entropy().sum(1, keepdim=True)# shape(mini_batch_size X 1)
  113. a_logprob_now = dist_now.log_prob(a[index])# a/b=exp(log(a)-log(b)) In multi-dimensional continuous action spacewe need to sum up the log_prob
  114. ratios = torch.exp(a_logprob_now.sum(1, keepdim=True)- a_logprob[index].sum(1, keepdim=True))# shape(mini_batch_size X 1)
  115. surr1 = ratios * adv[index]# Only calculate the gradient of 'a_logprob_now' in ratios
  116. surr2 = torch.clamp(ratios,1- self.epsilon,1+ self.epsilon)* adv[index]
  117. actor_loss =-torch.min(surr1, surr2)- self.entropy_coef * dist_entropy # Trick 5: policy entropy# Update actor
  118. self.optimizer_actor.zero_grad()
  119. actor_loss.mean().backward()if self.use_grad_clip:# Trick 7: Gradient clip
  120. torch.nn.utils.clip_grad_norm_(self.actor.parameters(),0.5)
  121. self.optimizer_actor.step()
  122. v_s = self.critic(s[index])
  123. critic_loss = F.mse_loss(v_target[index], v_s)# Update critic
  124. self.optimizer_critic.zero_grad()
  125. critic_loss.backward()if self.use_grad_clip:# Trick 7: Gradient clip
  126. torch.nn.utils.clip_grad_norm_(self.critic.parameters(),0.5)
  127. self.optimizer_critic.step()if self.use_lr_decay:# Trick 6:learning rate Decay
  128. self.lr_decay(total_steps)deflr_decay(self, total_steps):
  129. lr_a_now = self.lr_a *(1- total_steps / self.max_train_steps)
  130. lr_c_now = self.lr_c *(1- total_steps / self.max_train_steps)for p in self.optimizer_actor.param_groups:
  131. p['lr']= lr_a_now
  132. for p in self.optimizer_critic.param_groups:
  133. p['lr']= lr_c_now
  • 下面代码用来作为训练和测试网络的代码CPPO_main.py的代码:
  1. import matplotlib.pyplot as plt
  2. import numpy as np
  3. from normalization import Normalization, RewardScaling
  4. from replaybuffer import ReplayBuffer
  5. from ppo_continuous import PPO_continuous
  6. from Fighter_env import Fighter
  7. # 先定义一个参数类,用来储存超参数classargs_param(object):def__init__(self,max_train_steps=int(3e6),
  8. evaluate_freq=5e3,
  9. save_freq=20,
  10. policy_dist="Gaussian",
  11. batch_size=2048,
  12. mini_batch_size=64,
  13. hidden_width=64,
  14. lr_a=3e-4,
  15. lr_c=3e-4,
  16. gamma=0.99,
  17. lamda=0.95,
  18. epsilon=0.2,
  19. K_epochs=10,
  20. max_episode_steps =1000,
  21. use_adv_norm=True,
  22. use_state_norm=True,
  23. use_reward_norm=False,
  24. use_reward_scaling=True,
  25. entropy_coef=0.01,
  26. use_lr_decay=True,
  27. use_grad_clip=True,
  28. use_orthogonal_init=True,
  29. set_adam_eps=True,
  30. use_tanh=True):
  31. self.max_train_steps = max_train_steps
  32. self.evaluate_freq = evaluate_freq
  33. self.save_freq = save_freq
  34. self.policy_dist = policy_dist
  35. self.batch_size = batch_size
  36. self.mini_batch_size = mini_batch_size
  37. self.hidden_width = hidden_width
  38. self.lr_a = lr_a
  39. self.lr_c = lr_c
  40. self.gamma = gamma
  41. self.lamda = lamda
  42. self.epsilon = epsilon
  43. self.K_epochs = K_epochs
  44. self.use_adv_norm = use_adv_norm
  45. self.use_state_norm = use_state_norm
  46. self.use_reward_norm = use_reward_norm
  47. self.use_reward_scaling = use_reward_scaling
  48. self.entropy_coef = entropy_coef
  49. self.use_lr_decay = use_lr_decay
  50. self.use_grad_clip = use_grad_clip
  51. self.use_orthogonal_init = use_orthogonal_init
  52. self.set_adam_eps = set_adam_eps
  53. self.use_tanh = use_tanh
  54. self.max_episode_steps = max_episode_steps
  55. defprint_information(self):print("Maximum number of training steps:",self.max_train_steps)print("Evaluate the policy every 'evaluate_freq' steps:",self.evaluate_freq)print("Save frequency:",self.save_freq)print("Beta or Gaussian:",self.policy_dist)print("Batch size:",self.batch_size)print("Minibatch size:",self.mini_batch_size)print("The number of neurons in hidden layers of the neural network:",self.hidden_width)print("Learning rate of actor:",self.lr_a)print("Learning rate of critic:",self.lr_c)print("Discount factor:",self.gamma)print("GAE parameter:",self.lamda)print("PPO clip parameter:",self.epsilon)print("PPO parameter:",self.K_epochs)print("Trick 1:advantage normalization:",self.use_adv_norm)print("Trick 2:state normalization:",self.use_state_norm)print("Trick 3:reward normalization:",self.use_reward_norm)print("Trick 4:reward scaling:",self.use_reward_scaling)print("Trick 5: policy entropy:",self.entropy_coef)print("Trick 6:learning rate Decay:",self.use_lr_decay)print("Trick 7: Gradient clip:",self.use_grad_clip)print("Trick 8: orthogonal initialization:",self.use_orthogonal_init)print("Trick 9: set Adam epsilon=1e-5:",self.set_adam_eps)print("Trick 10: tanh activation function:",self.use_tanh)# 下面函数用来训练网络deftrain_network(args,env,show_picture=True):
  56. epsiode_rewards =[]
  57. epsiode_mean_rewards =[]# 下面将导入env环境参数
  58. args.state_dim = env.observation_space.shape[0]
  59. args.action_dim = env.action_space.shape[0]
  60. args.max_action =float(env.action_space[0][1])# 下面将定义一个缓冲区
  61. replay_buffer = ReplayBuffer(args)# 下面将定义一个PPO智能体类
  62. agent = PPO_continuous(args)# 下面将采用Trick 2技巧标准化
  63. state_norm = Normalization(shape=args.state_dim)if args.use_reward_norm:
  64. reward_norm = Normalization(shape=1)elif args.use_reward_scaling:# Trick 4:reward scaling
  65. reward_scaling = RewardScaling(shape=1, gamma=args.gamma)# 下面开始进行训练过程for epsiode inrange(args.max_train_steps):# 每个回合首先对值进行初始化
  66. epsiode_reward =0.0
  67. done =False
  68. epsiode_count =0# 再赋予一个新的初始状态
  69. s = env.reset()# 对状态进行标准化操作if args.use_state_norm:
  70. s = state_norm(s)if args.use_reward_scaling:
  71. reward_scaling.reset()# 设置一个死循环,后面若跳出便在死循环中跳出whileTrue:# 每执行一个回合,count次数加1
  72. epsiode_count +=1
  73. a,a_logprob = agent.choose_action(s)# 根据参数的不同选择输出是高斯分布/Beta分布调整if args.policy_dist =="Beta":
  74. action =2*(a-0.5)*args.max_action
  75. else:
  76. action = a
  77. # 下面是执行环境交互操作
  78. s_,r,done = env.step(action)## !!! 这里的环境是自己搭建的,输出每个人都不一样
  79. s_ = s_.T[0]## !!! 这一部起始没必要,由于我的环境有问题所以加的这一步
  80. epsiode_reward += r
  81. # 下面考虑状态标准化的情况if args.use_state_norm:
  82. s_ = state_norm(s_)# 下面考虑奖励标准化的可能性if args.use_reward_norm:
  83. r = reward_norm(r)elif args.use_reward_scaling:
  84. r = reward_scaling(r)# 下面考虑回合的最大运行次数(只要回合结束或者超过最大回合运行次数)if done or epsiode_count >= args.max_episode_steps:
  85. dw =Trueelse:
  86. dw =False# 将经验存入replayBuffer中
  87. replay_buffer.store(s,action,a_logprob,r,s_,dw,done)# 重新赋值状态
  88. s = s_
  89. # 当replaybuffer尺寸到达batchsize便会开始训练if replay_buffer.count == args.batch_size:
  90. agent.update(replay_buffer,epsiode)
  91. replay_buffer.count =0# 如果回合结束便退出if done:
  92. epsiode_rewards.append(epsiode_reward)
  93. epsiode_mean_rewards.append(np.mean(epsiode_rewards))print("第{}次训练的奖励为{:.2f},平均奖励为{:.2f}".format(epsiode,
  94. epsiode_reward, epsiode_mean_rewards[-1]))break# 如果需要画图的话if show_picture:
  95. plt.plot(epsiode_rewards)
  96. plt.plot(epsiode_mean_rewards)
  97. plt.xlabel("epsiodes")
  98. plt.ylabel("rewards")
  99. plt.title("Continuous PPO With Optimization")
  100. plt.legend(["rewards","mean_rewards"])
  101. plt.show()return agent
  102. # 下面将用训练好的网络跑一次例程deftest_network(args,env,agent,show_pictures=True):
  103. epsiode_reward =0.0
  104. done =False
  105. epsiode_count =0
  106. state_norm = Normalization(shape=args.state_dim)# 再赋予一个新的初始状态
  107. s = env.reset()
  108. x_array, y_array =[s[0]],[s[1]]if args.use_reward_norm:
  109. reward_norm = Normalization(shape=1)
  110. s = state_norm(s)elif args.use_reward_scaling:# Trick 4:reward scaling
  111. reward_scaling = RewardScaling(shape=1, gamma=args.gamma)
  112. reward_scaling.reset()# 设置一个死循环,后面若跳出便在死循环中跳出whileTrue:# 每执行一个回合,count次数加1
  113. epsiode_count +=1
  114. a, a_logprob = agent.choose_action(s)# 根据参数的不同选择输出是高斯分布/Beta分布调整if args.policy_dist =="Beta":
  115. action =2*(a -0.5)* args.max_action
  116. else:
  117. action = a
  118. # 下面是执行环境交互操作
  119. s_, r, done = env.step(action)## !!! 这里的环境是自己搭建的,输出每个人都不一样
  120. epsiode_reward += r
  121. s_ = s_.T[0]## !!! 这一部起始没必要,由于我的环境有问题所以加的这一步
  122. x_array.append(s_[0])
  123. y_array.append(s_[1])
  124. epsiode_reward += r
  125. # 下面考虑状态标准化的情况if args.use_state_norm:
  126. s_ = state_norm(s_)# 重新赋值状态
  127. s = s_
  128. # 如果回合结束便退出if done or epsiode_count>=2000:print("当前测试得分为{}".format(epsiode_reward))break# 下面开始画图if show_pictures:
  129. circle_x, circle_y =[],[]for theta in np.linspace(-np.pi, np.pi):
  130. circle_x.append(env.SAM_x + env.SAM_range * np.cos(theta))
  131. circle_y.append(env.SAM_y + env.SAM_range * np.sin(theta))
  132. plt.plot(x_array, y_array,'b.-', linewidth=1)
  133. plt.plot(env.SAM_x, env.SAM_y,'ro', linewidth=5)
  134. plt.plot(circle_x, circle_y,'r.-', linewidth=1)
  135. plt.plot(env.target_x, env.target_y,'go', linewidth=5)
  136. plt.plot(x_array[-1], y_array[-1],'m*', linewidth=5)
  137. plt.title("one road rewards:{:.2f}".format(epsiode_reward))
  138. plt.show()return x_array,y_array,epsiode_reward
  139. # 下面是主函数if __name__ =="__main__":# 声明环境
  140. args = args_param(max_episode_steps=1000,batch_size=64,max_train_steps=1000,K_epochs=3)# 声明参数
  141. env = Fighter(delta_t=25,SAM_range=30,Fighter_range=10,SAM_time=30)
  142. env.test(show_pictures=True,show_tips=False)
  143. agent = train_network(args,env)
  144. x_array,y_array,_ = test_network(args,env,agent)

本文转载自: https://blog.csdn.net/shengzimao/article/details/126787045
版权归原作者 赛亚茂 所有, 如有侵权,请联系我们删除。

“基于强化学习的空战辅助决策(2D)”的评论:

还没有评论