打开交易图表,堆上十个技术指标,然后对着屏幕发呆不知道下一步怎么操作——这场景对交易员来说太熟悉了。如果把历史数据丢给计算机,告诉它“去试错”。赚了有奖励,亏了有惩罚。让它在不断的尝试和失败中学习,最终迭代出一个不说完美、但至少能逻辑自洽的交易策略。
这就是 TensorTrade 的核心逻辑。
TensorTrade 是一个专注于利用 强化学习 (Reinforcement Learning, RL) 构建和训练交易算法的开源 Python 框架。
数据获取与特征工程
这里用
yfinance
抓取数据,配合
pandas_ta
计算技术指标。对数收益率 (Log Returns)、RSI 和 MACD 是几个比较基础的特征输入。
pip install yfinance pandas_ta
import yfinance as yf
import pandas_ta as ta
import pandas as pd
# Pick your ticker
TICKER = "TTRD" # TODO: change this to something real, e.g. "AAPL", "BTC-USD"
TRAIN_START_DATE = "2021-02-09"
TRAIN_END_DATE = "2021-09-30"
EVAL_START_DATE = "2021-10-01"
EVAL_END_DATE = "2021-11-12"
def build_dataset(ticker, start, end, filename):
# 1. Download hourly OHLCV data
df = yf.Ticker(ticker).history(
start=start,
end=end,
interval="60m"
)
# 2. Clean up
df = df.drop(["Dividends", "Stock Splits"], axis=1)
df["Volume"] = df["Volume"].astype(int)
# 3. Add some basic features
df.ta.log_return(append=True, length=16)
df.ta.rsi(append=True, length=14)
df.ta.macd(append=True, fast=12, slow=26)
# 4. Move Datetime from index to column
df = df.reset_index()
# 5. Save
df.to_csv(filename, index=False)
print(f"Saved {filename} with {len(df)} rows")
build_dataset(TICKER, TRAIN_START_DATE, TRAIN_END_DATE, "training.csv")
build_dataset(TICKER, EVAL_START_DATE, EVAL_END_DATE, "evaluation.csv")
脚本跑完,目录下会生成
training.csv
和
evaluation.csv
。包含了 OHLCV 基础数据和几个预处理好的指标。这些就是训练 RL 模型的数据。
构建 TensorTrade 交互环境
强化学习没法直接使用CSV 文件。所以需要一个标准的交互 **环境 (Environment)**:能够输出当前状态 (State),接收智能体的动作 (Action),并反馈奖励 (Reward)。
TensorTrade 把这个过程模块化了:
Instrument:定义交易标的(如 USD, TTRD)。Wallet:管理资产余额。Portfolio:钱包组合。Stream/DataFeed:处理特征数据流。reward_scheme/action_scheme:定义怎么操作,以及操作的好坏怎么评分。
pip install tensortrade
下面是一个环境工厂函数 (Environment Factory) 的实现,设计得比较轻量,这样可以方便后续接入 Ray:
import os
import pandas as pd
from tensortrade.feed.core import DataFeed, Stream
from tensortrade.oms.instruments import Instrument
from tensortrade.oms.exchanges import Exchange, ExchangeOptions
from tensortrade.oms.services.execution.simulated import execute_order
from tensortrade.oms.wallets import Wallet, Portfolio
import tensortrade.env.default as default
def create_env(config):
"""
Build a TensorTrade environment from a CSV.
config needs:
- csv_filename
- window_size
- reward_window_size
- max_allowed_loss
"""
# 1. Read the dataset
dataset = (
pd.read_csv(config["csv_filename"], parse_dates=["Datetime"])
.fillna(method="backfill")
.fillna(method="ffill")
)
# 2. Price stream (we'll trade on Close)
commission = 0.0035 # 0.35%, tweak this to your broker
price = Stream.source(
list(dataset["Close"]), dtype="float"
).rename("USD-TTRD")
options = ExchangeOptions(commission=commission)
exchange = Exchange("TTSE", service=execute_order, options=options)(price)
# 3. Instruments and wallets
USD = Instrument("USD", 2, "US Dollar")
TTRD = Instrument("TTRD", 2, "TensorTrade Corp") # just a label
cash_wallet = Wallet(exchange, 1000 * USD) # start with $1000
asset_wallet = Wallet(exchange, 0 * TTRD) # start with zero TTRD
portfolio = Portfolio(USD, [cash_wallet, asset_wallet])
# 4. Renderer feed (optional, useful for plotting later)
renderer_feed = DataFeed([
Stream.source(list(dataset["Datetime"])).rename("date"),
Stream.source(list(dataset["Open"]), dtype="float").rename("open"),
Stream.source(list(dataset["High"]), dtype="float").rename("high"),
Stream.source(list(dataset["Low"]), dtype="float").rename("low"),
Stream.source(list(dataset["Close"]), dtype="float").rename("close"),
Stream.source(list(dataset["Volume"]), dtype="float").rename("volume"),
])
renderer_feed.compile()
# 5. Feature feed for the RL agent
features = []
# Skip Datetime (first column) and stream everything else
for col in dataset.columns[1:]:
s = Stream.source(list(dataset[col]), dtype="float").rename(col)
features.append(s)
feed = DataFeed(features)
feed.compile()
# 6. Reward and action scheme
reward_scheme = default.rewards.SimpleProfit(
window_size=config["reward_window_size"]
)
action_scheme = default.actions.BSH(
cash=cash_wallet,
asset=asset_wallet
)
# 7. Put everything together in an environment
env = default.create(
portfolio=portfolio,
action_scheme=action_scheme,
reward_scheme=reward_scheme,
feed=feed,
renderer=[],
renderer_feed=renderer_feed,
window_size=config["window_size"],
max_allowed_loss=config["max_allowed_loss"]
)
return env
这样“游戏”规则就已经定好了:观察最近 N 根 K 线和指标(State),决定买卖持(Action),目标是让一段时间内的利润最大化(Reward)。
基于 Ray RLlib 与 PPO 算法的模型训练
底层环境搭好,接下来让 Ray RLlib 介入处理 RL 的核心逻辑。
选用 PPO (Proximal Policy Optimization) 算法,这在连续控制和离散动作空间都有不错的表现。为了找到更优解,顺手做一个简单的超参数网格搜索:网络架构、学习率、Minibatch 大小,都跑一遍试试。
pip install "ray[rllib]"
训练脚本如下:
import os
import ray
from ray import tune
from ray.tune.registry import register_env
from your_module import create_env # wherever you defined create_env
# Some hyperparameter grids to try
FC_SIZE = tune.grid_search([
[256, 256],
[1024],
[128, 64, 32],
])
LEARNING_RATE = tune.grid_search([
0.001,
0.0005,
0.00001,
])
MINIBATCH_SIZE = tune.grid_search([
5,
10,
20,
])
cwd = os.getcwd()
# Register our custom environment with RLlib
register_env("MyTrainingEnv", lambda cfg: create_env(cfg))
env_config_training = {
"window_size": 14,
"reward_window_size": 7,
"max_allowed_loss": 0.10, # cut episodes early if loss > 10%
"csv_filename": os.path.join(cwd, "training.csv"),
}
env_config_evaluation = {
"max_allowed_loss": 1.00,
"csv_filename": os.path.join(cwd, "evaluation.csv"),
}
ray.init(ignore_reinit_error=True)
analysis = tune.run(
run_or_experiment="PPO",
name="MyExperiment1",
metric="episode_reward_mean",
mode="max",
stop={
"training_iteration": 5, # small for demo, increase in real runs
},
config={
"env": "MyTrainingEnv",
"env_config": env_config_training,
"log_level": "WARNING",
"framework": "torch", # or "tf"
"ignore_worker_failures": True,
"num_workers": 1,
"num_envs_per_worker": 1,
"num_gpus": 0,
"clip_rewards": True,
"lr": LEARNING_RATE,
"gamma": 0.50, # discount factor
"observation_filter": "MeanStdFilter",
"model": {
"fcnet_hiddens": FC_SIZE,
},
"sgd_minibatch_size": MINIBATCH_SIZE,
"evaluation_interval": 1,
"evaluation_config": {
"env_config": env_config_evaluation,
"explore": False, # no exploration during evaluation
},
},
num_samples=1,
keep_checkpoints_num=10,
checkpoint_freq=1,
)
这段代码本质上是在运行一场“交易机器人锦标赛”。Ray 会根据定义的参数组合并行训练多个 PPO 智能体,追踪它们的平均回合奖励,并保存下表现最好的 Checkpoint 供后续调用。
自定义奖励机制 (PBR)
默认的
SimpleProfit
奖励逻辑很简单,但实战中往往过于粗糙。我们有时需要根据具体的交易逻辑来重塑奖励函数。比如说基于持仓的奖励方案 **PBR (Position-Based Reward)**:
- 维护当前持仓状态(多头或空头)。
- 监控价格变动。
- 奖励计算 = 价格变动 × 持仓方向。
价格涨了你做多,给正反馈;价格跌了你做空,也给正反馈。反之则是惩罚。
from tensortrade.env.default.rewards import RewardScheme
from tensortrade.feed.core import DataFeed, Stream
class PBR(RewardScheme):
"""
Position-Based Reward (PBR)
Rewards the agent based on price changes and its current position.
"""
registered_name = "pbr"
def __init__(self, price: Stream):
super().__init__()
self.position = -1 # start flat/short
# Price differences
r = Stream.sensor(price, lambda p: p.value, dtype="float").diff()
# Position stream
position = Stream.sensor(self, lambda rs: rs.position, dtype="float")
# Reward = price_change * position
reward = (r * position).fillna(0).rename("reward")
self.feed = DataFeed([reward])
self.feed.compile()
def on_action(self, action: int):
# Simple mapping: action 0 = long, everything else = short
self.position = 1 if action == 0 else -1
def get_reward(self, portfolio):
return self.feed.next()["reward"]
def reset(self):
self.position = -1
self.feed.reset()
接入也很简单,在
create_env
函数里替换掉原来的
reward_scheme
即可:
reward_scheme = PBR(price)
这样改的好处是反馈更密集。智能体不需要等到最后平仓才知道赚没赚,每一个 step 都能收到关于“是否站对了队”的信号。
后续优化方向与建议
这套流程跑通只是个开始,想要真正可用,还有很多工作要做 比如:
- 数据置换:代码里的
TTRD只是个占位符,换成真实的标的(股票、Crypto、指数)。 - 特征工程:RSI 和 MACD 只是抛砖引玉,试试 ATR、布林带,或者引入更长时间周期的特征。
- 参数调优:
gamma(折扣因子)、window_size(观测窗口)对策略风格影响巨大,值得花时间去扫参。 - 基准测试:这一步最关键。把你训练出来的 RL 策略和 Buy & Hold(买入持有)比一比,甚至和随机策略比一比。如果跑不过随机策略,那就得从头检查了。
最后别忘了,我们只是研究,所以不要直接实盘。模型在训练集上大杀四方是常态,能通过样本外测试和模拟盘 (Paper Trading) 的考验才是真本事。
https://github.com/tensortrade-org/tensortrade
作者:CodeBun