0


基于强化学习的量化交易框架 TensorTrade

打开交易图表,堆上十个技术指标,然后对着屏幕发呆不知道下一步怎么操作——这场景对交易员来说太熟悉了。如果把历史数据丢给计算机,告诉它“去试错”。赚了有奖励,亏了有惩罚。让它在不断的尝试和失败中学习,最终迭代出一个不说完美、但至少能逻辑自洽的交易策略。

这就是 TensorTrade 的核心逻辑。

TensorTrade 是一个专注于利用 强化学习 (Reinforcement Learning, RL) 构建和训练交易算法的开源 Python 框架。

数据获取与特征工程

这里用

yfinance

抓取数据,配合

pandas_ta

计算技术指标。对数收益率 (Log Returns)、RSI 和 MACD 是几个比较基础的特征输入。

  pip install yfinance pandas_ta

import yfinance as yf  
import pandas_ta as ta  
import pandas as pd  

# Pick your ticker  
TICKER = "TTRD"  # TODO: change this to something real, e.g. "AAPL", "BTC-USD"  
TRAIN_START_DATE = "2021-02-09"  
TRAIN_END_DATE   = "2021-09-30"  
EVAL_START_DATE  = "2021-10-01"  
EVAL_END_DATE    = "2021-11-12"  

def build_dataset(ticker, start, end, filename):  
    # 1. Download hourly OHLCV data  
    df = yf.Ticker(ticker).history(  
        start=start,  
        end=end,  
        interval="60m"  
    )  
    # 2. Clean up  
    df = df.drop(["Dividends", "Stock Splits"], axis=1)  
    df["Volume"] = df["Volume"].astype(int)  
    # 3. Add some basic features  
    df.ta.log_return(append=True, length=16)  
    df.ta.rsi(append=True, length=14)  
    df.ta.macd(append=True, fast=12, slow=26)  
    # 4. Move Datetime from index to column  
    df = df.reset_index()  
    # 5. Save  
    df.to_csv(filename, index=False)  
    print(f"Saved {filename} with {len(df)} rows")  

build_dataset(TICKER, TRAIN_START_DATE, TRAIN_END_DATE, "training.csv")  
 build_dataset(TICKER, EVAL_START_DATE,  EVAL_END_DATE,  "evaluation.csv")

脚本跑完,目录下会生成

training.csv

evaluation.csv

。包含了 OHLCV 基础数据和几个预处理好的指标。这些就是训练 RL 模型的数据。

构建 TensorTrade 交互环境

强化学习没法直接使用CSV 文件。所以需要一个标准的交互 **环境 (Environment)**:能够输出当前状态 (State),接收智能体的动作 (Action),并反馈奖励 (Reward)。

TensorTrade 把这个过程模块化了:

  • Instrument:定义交易标的(如 USD, TTRD)。
  • Wallet:管理资产余额。
  • Portfolio:钱包组合。
  • Stream / DataFeed:处理特征数据流。
  • reward_scheme / action_scheme:定义怎么操作,以及操作的好坏怎么评分。
  pip install tensortrade

下面是一个环境工厂函数 (Environment Factory) 的实现,设计得比较轻量,这样可以方便后续接入 Ray:

 import os  
import pandas as pd  

from tensortrade.feed.core import DataFeed, Stream  
from tensortrade.oms.instruments import Instrument  
from tensortrade.oms.exchanges import Exchange, ExchangeOptions  
from tensortrade.oms.services.execution.simulated import execute_order  
from tensortrade.oms.wallets import Wallet, Portfolio  
import tensortrade.env.default as default  

def create_env(config):  
    """  
    Build a TensorTrade environment from a CSV.  
    config needs:  
      - csv_filename  
      - window_size  
      - reward_window_size  
      - max_allowed_loss  
    """  
    # 1. Read the dataset  
    dataset = (  
        pd.read_csv(config["csv_filename"], parse_dates=["Datetime"])  
        .fillna(method="backfill")  
        .fillna(method="ffill")  
    )  
    # 2. Price stream (we'll trade on Close)  
    commission = 0.0035  # 0.35%, tweak this to your broker  
    price = Stream.source(  
        list(dataset["Close"]), dtype="float"  
    ).rename("USD-TTRD")  
    options = ExchangeOptions(commission=commission)  
    exchange = Exchange("TTSE", service=execute_order, options=options)(price)  
    # 3. Instruments and wallets  
    USD = Instrument("USD", 2, "US Dollar")  
    TTRD = Instrument("TTRD", 2, "TensorTrade Corp")  # just a label  
    cash_wallet = Wallet(exchange, 1000 * USD)  # start with $1000  
    asset_wallet = Wallet(exchange, 0 * TTRD)   # start with zero TTRD  
    portfolio = Portfolio(USD, [cash_wallet, asset_wallet])  
    # 4. Renderer feed (optional, useful for plotting later)  
    renderer_feed = DataFeed([  
        Stream.source(list(dataset["Datetime"])).rename("date"),  
        Stream.source(list(dataset["Open"]), dtype="float").rename("open"),  
        Stream.source(list(dataset["High"]), dtype="float").rename("high"),  
        Stream.source(list(dataset["Low"]), dtype="float").rename("low"),  
        Stream.source(list(dataset["Close"]), dtype="float").rename("close"),  
        Stream.source(list(dataset["Volume"]), dtype="float").rename("volume"),  
    ])  
    renderer_feed.compile()  
    # 5. Feature feed for the RL agent  
    features = []  
    # Skip Datetime (first column) and stream everything else  
    for col in dataset.columns[1:]:  
        s = Stream.source(list(dataset[col]), dtype="float").rename(col)  
        features.append(s)  
    feed = DataFeed(features)  
    feed.compile()  
    # 6. Reward and action scheme  
    reward_scheme = default.rewards.SimpleProfit(  
        window_size=config["reward_window_size"]  
    )  
    action_scheme = default.actions.BSH(  
        cash=cash_wallet,  
        asset=asset_wallet  
    )  
    # 7. Put everything together in an environment  
    env = default.create(  
        portfolio=portfolio,  
        action_scheme=action_scheme,  
        reward_scheme=reward_scheme,  
        feed=feed,  
        renderer=[],  
        renderer_feed=renderer_feed,  
        window_size=config["window_size"],  
        max_allowed_loss=config["max_allowed_loss"]  
    )  
     return env

这样“游戏”规则就已经定好了:观察最近 N 根 K 线和指标(State),决定买卖持(Action),目标是让一段时间内的利润最大化(Reward)。

基于 Ray RLlib 与 PPO 算法的模型训练

底层环境搭好,接下来让 Ray RLlib 介入处理 RL 的核心逻辑。

选用 PPO (Proximal Policy Optimization) 算法,这在连续控制和离散动作空间都有不错的表现。为了找到更优解,顺手做一个简单的超参数网格搜索:网络架构、学习率、Minibatch 大小,都跑一遍试试。

  pip install "ray[rllib]"

训练脚本如下:

 import os  
import ray  
from ray import tune  
from ray.tune.registry import register_env  

from your_module import create_env  # wherever you defined create_env  

# Some hyperparameter grids to try  
FC_SIZE = tune.grid_search([  
    [256, 256],  
    [1024],  
    [128, 64, 32],  
])  
LEARNING_RATE = tune.grid_search([  
    0.001,  
    0.0005,  
    0.00001,  
])  
MINIBATCH_SIZE = tune.grid_search([  
    5,  
    10,  
    20,  
])  
cwd = os.getcwd()  
# Register our custom environment with RLlib  
register_env("MyTrainingEnv", lambda cfg: create_env(cfg))  
env_config_training = {  
    "window_size": 14,  
    "reward_window_size": 7,  
    "max_allowed_loss": 0.10,  # cut episodes early if loss > 10%  
    "csv_filename": os.path.join(cwd, "training.csv"),  
}  
env_config_evaluation = {  
    "max_allowed_loss": 1.00,  
    "csv_filename": os.path.join(cwd, "evaluation.csv"),  
}  
ray.init(ignore_reinit_error=True)  
analysis = tune.run(  
    run_or_experiment="PPO",  
    name="MyExperiment1",  
    metric="episode_reward_mean",  
    mode="max",  
    stop={  
        "training_iteration": 5,  # small for demo, increase in real runs  
    },  
    config={  
        "env": "MyTrainingEnv",  
        "env_config": env_config_training,  
        "log_level": "WARNING",  
        "framework": "torch",     # or "tf"  
        "ignore_worker_failures": True,  
        "num_workers": 1,  
        "num_envs_per_worker": 1,  
        "num_gpus": 0,  
        "clip_rewards": True,  
        "lr": LEARNING_RATE,  
        "gamma": 0.50,            # discount factor  
        "observation_filter": "MeanStdFilter",  
        "model": {  
            "fcnet_hiddens": FC_SIZE,  
        },  
        "sgd_minibatch_size": MINIBATCH_SIZE,  
        "evaluation_interval": 1,  
        "evaluation_config": {  
            "env_config": env_config_evaluation,  
            "explore": False,     # no exploration during evaluation  
        },  
    },  
    num_samples=1,  
    keep_checkpoints_num=10,  
    checkpoint_freq=1,  
 )

这段代码本质上是在运行一场“交易机器人锦标赛”。Ray 会根据定义的参数组合并行训练多个 PPO 智能体,追踪它们的平均回合奖励,并保存下表现最好的 Checkpoint 供后续调用。

自定义奖励机制 (PBR)

默认的

SimpleProfit

奖励逻辑很简单,但实战中往往过于粗糙。我们有时需要根据具体的交易逻辑来重塑奖励函数。比如说基于持仓的奖励方案 **PBR (Position-Based Reward)**:

  • 维护当前持仓状态(多头或空头)。
  • 监控价格变动。
  • 奖励计算 = 价格变动 × 持仓方向。

价格涨了你做多,给正反馈;价格跌了你做空,也给正反馈。反之则是惩罚。

 from tensortrade.env.default.rewards import RewardScheme  
from tensortrade.feed.core import DataFeed, Stream  

class PBR(RewardScheme):  
    """  
    Position-Based Reward (PBR)  
    Rewards the agent based on price changes and its current position.  
    """  
    registered_name = "pbr"  
    def __init__(self, price: Stream):  
        super().__init__()  
        self.position = -1  # start flat/short  
        # Price differences  
        r = Stream.sensor(price, lambda p: p.value, dtype="float").diff()  
        # Position stream  
        position = Stream.sensor(self, lambda rs: rs.position, dtype="float")  
        # Reward = price_change * position  
        reward = (r * position).fillna(0).rename("reward")  
        self.feed = DataFeed([reward])  
        self.feed.compile()  
    def on_action(self, action: int):  
        # Simple mapping: action 0 = long, everything else = short  
        self.position = 1 if action == 0 else -1  
    def get_reward(self, portfolio):  
        return self.feed.next()["reward"]  
    def reset(self):  
        self.position = -1  
         self.feed.reset()

接入也很简单,在

create_env

函数里替换掉原来的

reward_scheme

即可:

 reward_scheme = PBR(price)

这样改的好处是反馈更密集。智能体不需要等到最后平仓才知道赚没赚,每一个 step 都能收到关于“是否站对了队”的信号。

后续优化方向与建议

这套流程跑通只是个开始,想要真正可用,还有很多工作要做 比如:

  • 数据置换:代码里的 TTRD 只是个占位符,换成真实的标的(股票、Crypto、指数)。
  • 特征工程:RSI 和 MACD 只是抛砖引玉,试试 ATR、布林带,或者引入更长时间周期的特征。
  • 参数调优gamma(折扣因子)、window_size(观测窗口)对策略风格影响巨大,值得花时间去扫参。
  • 基准测试:这一步最关键。把你训练出来的 RL 策略和 Buy & Hold(买入持有)比一比,甚至和随机策略比一比。如果跑不过随机策略,那就得从头检查了。

最后别忘了,我们只是研究,所以不要直接实盘。模型在训练集上大杀四方是常态,能通过样本外测试和模拟盘 (Paper Trading) 的考验才是真本事。

https://github.com/tensortrade-org/tensortrade

作者:CodeBun

“基于强化学习的量化交易框架 TensorTrade”的评论:

还没有评论