经过initialize的一次性配置和before_trading_start的每日盘前准备本文将详细讲解承载所有核心交易逻辑的函数handle_data()这个函数
1. handle_data(context, data) 函数的核心定位
handle_data()函数会在交易时段内,根据我们设定的频率(按天或按分钟)被平台周期性地反复调用
策略所有的行情分析、信号判断、交易决策和下单执行,都集中在这个函数内完成可以说,策略的绝大部分运行时长,都在handle_data的循环调用中度过
2. handle_data 的两大核心参数
handle_data函数带有两个至关重要的参数:context和data,它们是策略决策的数据基础
3. handle_data 的标准工作流
为了方便学习使用一个逻辑清晰的handle_data函数;我们可以遵循以下四步走。这有助于构建出结构稳定、易于维护的策略
1 数据获取:根据策略需要,获取用于计算指标的历史数据(如使用get_history),以及用于决策的当前价格(直接从data对象获取)
2 信号生成:基于获取的数据,进行指标计算(如均线、MACD),并根据预设的交易逻辑判断是否产生了买入或卖出信号
3 持仓判断:在产生交易信号后,需要检查当前的持仓状态(如使用context.portfolio.positions或get_position),以决定下一步是该开仓加仓、减仓还是清仓
4 执行下单:根据信号和持仓状态,调用下单函数(如order、order_target)执行最终的交易操作
4. 完整的 handle_data 代码示例(双均线策略)
下面的代码完整地演示了在一个handle_data函数中,如何实现一个经典的双均线(金叉买入,死叉卖出)策略
def initialize(context):
set_benchmark("000300.SS")
try:
set_commission(commission_ratio=0.0003, min_commission=5.0, type="STOCK")
set_slippage(slippage=0.0024)
except Exception as e:
log.info(f"提示:当前环境可能不支持 set_commission/set_slippage:{e}")
g.security_list = ["600519.SS", "000001.SZ"]
set_universe(g.security_list)
g.short_ma_period = 10
g.long_ma_period = 30
g.traded_today = False
g.today_options = list(g.security_list)
g._last_debug_day = None
log.info("策略初始化完成。")
def before_trading_start(context, data):
g.traded_today = False
dt = getattr(context, "current_dt", None)
date_str = dt.strftime("%Y-%m-%d") if dt else "unknown_date"
log.info(f"进入交易日 {date_str},重置每日状态。")
sec_list = getattr(g, "security_list", None) or ["600519.SS", "000001.SZ"]
g.security_list = sec_list
g.today_options = []
try:
hist = get_history(1, "1d", "close", security_list=sec_list, fq=None, include=False)
except TypeError:
hist = get_history(1, frequency="1d", field="close", security_list=sec_list, fq=None, include=False)
if hist is None:
g.today_options = list(sec_list)
else:
close_map = None
try:
import pandas as pd
if isinstance(hist, pd.DataFrame):
cols = set(hist.columns)
if ("code" in cols) and ("close" in cols):
m = {}
for s in sec_list:
x = hist.loc[hist["code"] == s, "close"]
if len(x) > 0:
m[s] = float(x.iloc[-1])
close_map = m
elif all(s in cols for s in sec_list):
close_map = {s: float(hist[s].iloc[-1]) for s in sec_list}
elif isinstance(hist, pd.Series):
close_map = {sec_list[0]: float(hist.iloc[-1])}
except Exception:
close_map = None
if close_map is None:
g.today_options = list(sec_list)
else:
for s in sec_list:
if s in close_map and close_map[s] > 20:
g.today_options.append(s)
if not g.today_options:
g.today_options = list(sec_list)
log.info(f"今日备选股票池: {g.today_options}")
log.info(f"盘前可用资金: {context.portfolio.cash:.2f}")
def _extract_close_series(hist, security):
try:
import pandas as pd
if isinstance(hist, pd.Series):
return hist.dropna()
if isinstance(hist, pd.DataFrame):
cols = set(hist.columns)
if ("code" in cols) and ("close" in cols):
return hist.loc[hist["code"] == security, "close"].dropna()
if security in cols:
return hist[security].dropna()
if "close" in cols:
return hist["close"].dropna()
except Exception:
pass
try:
return hist["close"][security].dropna()
except Exception:
return None
def handle_data(context, data):
sec_list = getattr(g, "security_list", None) or ["600519.SS", "000001.SZ"]
g.security_list = sec_list
short_n = int(getattr(g, "short_ma_period", 10))
long_n = int(getattr(g, "long_ma_period", 30))
if not (0 < short_n < long_n):
log.info("均线参数不合法:要求 0 < short < long。")
return
candidates = getattr(g, "today_options", None) or sec_list
if not candidates:
return
security = candidates[0]
try:
hist = get_history(long_n + 1, "1d", "close", security_list=security, fq=None, include=False)
except TypeError:
hist = get_history(long_n + 1, frequency="1d", field="close", security_list=security, fq=None, include=False)
if hist is None:
return
close = _extract_close_series(hist, security)
if close is None or len(close) < long_n + 1:
log.info(f"{security} 历史数据不足或解析失败,跳过。")
return
s_ma_t = close.iloc[-short_n:].mean()
l_ma_t = close.iloc[-long_n:].mean()
close_prev = close.iloc[:-1]
s_ma_prev = close_prev.iloc[-short_n:].mean()
l_ma_prev = close_prev.iloc[-long_n:].mean()
golden_cross = (s_ma_prev l_ma_t)
death_cross = (s_ma_prev >= l_ma_prev) and (s_ma_t < l_ma_t)
pos = get_position(security)
pos_amount = getattr(pos, "amount", 0) or 0
dt = getattr(context, "current_dt", None)
d = dt.date() if dt else None
if getattr(g, "_last_debug_day", None) != d:
g._last_debug_day = d
log.info(
f"{security} | s_prev={s_ma_prev:.3f}, l_prev={l_ma_prev:.3f}, "
f"s={s_ma_t:.3f}, l={l_ma_t:.3f} | "
f"golden={golden_cross}, death={death_cross} | pos_amount={pos_amount}"
)
try:
current_price = data[security].price
except Exception:
try:
current_price = data[security]["close"]
except Exception:
current_price = None
target_amount = 100
if golden_cross and pos_amount == 0:
delta = target_amount - pos_amount
if delta != 0:
if current_price is not None:
oid = order(security, delta, limit_price=current_price)
else:
oid = order(security, delta)
log.info(f"金叉下单:{security} delta={delta} price={current_price} order_id={oid}")
g.traded_today = True
elif death_cross and pos_amount > 0:
delta = -pos_amount
if current_price is not None:
oid = order(security, delta, limit_price=current_price)
else:
oid = order(security, delta)
log.info(f"死叉下单:{security} delta={delta} price={current_price} order_id={oid}")
g.traded_today = True5. 注意事项
• 执行频率:handle_data的执行频率(日或分钟)对策略行为有决定性影响。分钟级策略反应更灵敏,但交易可能更频繁;日级策略则更关注长期趋势
• 避免重复交易:一个交易信号可能会在连续多个分钟Bar内持续满足条件。需要设计逻辑(如使用g.traded_today标记)来避免在同一信号下重复下单
• 数据完备性:在调用get_history获取数据时,应检查返回的数据长度是否满足计算指标的要求,避免因数据不足导致计算错误
handle_data是策略实现价值的核心所在。通过“数据获取 -> 信号生成 -> 持仓判断 -> 执行下单”的标准化流程,
我们便可以逻辑清晰的将复杂的交易思想,转化为清晰、可执行的代码
PTrade免费申请
PTradeQMT免费领取学习案例
PTradeQMT落地辅助
需要的朋友欢迎联系 ~~~
著作权归文章作者所有。