这个问题比较常见,而它的本质是
当我们的策略变得复杂时,可能就需要同时管理 10 个、100 个订单。如果我们只是用一个变量存储 order_id,是很麻烦的
因此我们需要进行订单管理
我们用实际案例去演示如何使用订单管理系统来管理多个订单(详细的思路步骤都在下面的代码中标注好啦~可以自行翻看学习实操)
解决这个问题的核心思想就是:用一个字典来存储所有订单,按股票分类,随时可以查询
上代码!!
"""
明确我们的执行流程:
1. 创建订单管理系统
2. 同时下 3 个订单(可能是不同的股票,也可能是同一只股票的不同价格)
3. 实时监听订单状态变化,自动更新管理系统
4. 随时查询订单状态、统计在途数量等
5. 程序结束时打印所有订单的摘要
"""
import time
import logging
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant.xtconstant import STOCK_BUY, STOCK_SELL, FIX_PRICE
# ============================================================================
# 第一步:配置日志
# ============================================================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# ============================================================================
# 第二步:定义订单管理系统
# ============================================================================
class OrderManagementSystem:
"""
订单管理系统的作用:
1. 记录所有已下的订单
2. 追踪每个订单的实时状态
3. 提供查询和统计功能
"""
def __init__(self):
"""初始化订单管理系统"""
# 存储所有订单:{ order_id: order_info }
# order_info 包含:股票代码、订单类型、数量、价格、状态、已成交数等
self.orders = {}
# 按股票代码分类:{ stock_code: [order_id1, order_id2, ...] }
# 这样可以快速找到某只股票的所有订单
self.orders_by_stock = {}
# 记录订单的创建时间,方便后续分析
self.order_create_times = {}
def add_order(self, order_id, stock_code, order_type, volume, price):
"""
记录一个新下的订单
参数说明:
- order_id: 订单编号(由交易所生成)
- stock_code: 股票代码,如 '600000.SH'
- order_type: 订单类型,STOCK_BUY(买)或 STOCK_SELL(卖)
- volume: 订单数量,单位是股
- price: 订单价格,单位是元
这个方法在你下单成功后立刻调用。
"""
# 创建订单信息字典
self.orders[order_id] = {
'stock_code': stock_code,
'order_type': order_type,
'volume': volume,
'price': price,
'status': 'pending', # 初始状态:待报
'traded_volume': 0, # 已成交数量,初始为 0
'create_time': time.time()
}
# 按股票分类
if stock_code not in self.orders_by_stock:
self.orders_by_stock[stock_code] = []
self.orders_by_stock[stock_code].append(order_id)
# 打印日志
order_type_display = '买' if order_type == STOCK_BUY else '卖'
logging.info(
f"✓ 订单已记录:{order_id} - {stock_code} {order_type_display} "
f"{volume}股 @ {price}元"
)
def update_order_status(self, order_id, status, traded_volume):
"""
更新订单状态
参数说明:
- order_id: 订单编号
- status: 新的状态(字符串形式,如 'reported', 'part_traded', 'completed')
- traded_volume: 已成交的数量
这个方法在 on_stock_order 回调中调用,用来实时更新订单信息。
"""
if order_id in self.orders:
self.orders[order_id]['status'] = status
self.orders[order_id]['traded_volume'] = traded_volume
def get_order_info(self, order_id):
"""
查询单个订单的详细信息
返回值:
- 如果订单存在,返回订单信息字典
- 如果订单不存在,返回 None
"""
return self.orders.get(order_id)
def get_pending_orders(self, stock_code=None):
"""
获取所有还在途中的订单(未成交或部分成交)
参数说明:
- stock_code: 可选,如果指定则只返回该股票的订单
返回值:
- 在途订单的 order_id 列表
什么是"在途"?
- 待报:订单正在发送中
- 已报:订单已到交易所,等待成交
- 部成:订单部分成交了,还有部分在等待
"""
pending = []
for order_id, info in self.orders.items():
# 检查订单状态是否是"在途"
if info['status'] in ['pending', 'reported', 'part_traded']:
# 如果指定了股票代码,则只返回该股票的订单
if stock_code is None or info['stock_code'] == stock_code:
pending.append(order_id)
return pending
def get_completed_orders(self, stock_code=None):
"""
获取所有已完成的订单(全部成交或已撤销)
参数说明:
- stock_code: 可选,如果指定则只返回该股票的订单
返回值:
- 已完成订单的 order_id 列表
"""
completed = []
for order_id, info in self.orders.items():
if info['status'] in ['completed', 'canceled']:
if stock_code is None or info['stock_code'] == stock_code:
completed.append(order_id)
return completed
def get_total_pending_buy_volume(self, stock_code):
"""
计算某只股票还有多少买单在途
这是一个非常重要的方法!
它可以帮助你防止重复下单。
比如:
- 你想买 200 股 600000.SH
- 但已经有 100 股的买单在途
- 那么你最多还能再买 100 股
参数说明:
- stock_code: 股票代码
返回值:
- 该股票还未成交的买单总数量
"""
total = 0
# 遍历该股票的所有订单
for order_id in self.orders_by_stock.get(stock_code, []):
info = self.orders[order_id]
# 检查是否是买单 + 还在途中
if (info['order_type'] == STOCK_BUY and
info['status'] in ['pending', 'reported', 'part_traded']):
# 未成交的数量 = 总数 - 已成交数
untraded = info['volume'] - info['traded_volume']
total += untraded
return total
def get_total_pending_sell_volume(self, stock_code):
"""
计算某只股票还有多少卖单在途
逻辑同 get_total_pending_buy_volume,只是统计的是卖单。
"""
total = 0
for order_id in self.orders_by_stock.get(stock_code, []):
info = self.orders[order_id]
if (info['order_type'] == STOCK_SELL and
info['status'] in ['pending', 'reported', 'part_traded']):
untraded = info['volume'] - info['traded_volume']
total += untraded
return total
def print_summary(self):
"""
打印所有订单的摘要
这个方法在程序结束时调用,用来显示所有订单的最终状态。
"""
logging.info("=" * 80)
logging.info("【订单摘要】")
logging.info("=" * 80)
if not self.orders:
logging.info("没有任何订单")
return
# 遍历所有订单并打印
for order_id, info in self.orders.items():
# 将状态码转换为易读的状态名
status_display = {
'pending': '待报',
'reported': '已报',
'part_traded': '部成',
'completed': '已成',
'canceled': '已撤'
}.get(info['status'], '未知')
# 将订单类型转换为易读的名称
order_type_display = '买' if info['order_type'] == STOCK_BUY else '卖'
# 计算未成交数量
untraded = info['volume'] - info['traded_volume']
# 打印订单信息
logging.info(
f"订单 {order_id}: {info['stock_code']} {order_type_display} "
f"{info['volume']}股 @ {info['price']}元 [{status_display}] "
f"已成交 {info['traded_volume']}股,未成交 {untraded}股"
)
logging.info("=" * 80)
def get_statistics(self):
"""
获取订单统计信息
返回一个字典,包含:
- 总订单数
- 在途订单数
- 已完成订单数
- 总成交金额
等等
"""
total_orders = len(self.orders)
pending_orders = len(self.get_pending_orders())
completed_orders = len(self.get_completed_orders())
total_amount = 0
for info in self.orders.values():
total_amount += info['price'] * info['traded_volume']
return {
'total_orders': total_orders,
'pending_orders': pending_orders,
'completed_orders': completed_orders,
'total_amount': total_amount
}
# ============================================================================
# 第三步:定义回调类,处理订单状态变化
# ============================================================================
class SmartCallback(XtQuantTraderCallback):
"""
这个回调类的作用:
1. 监听所有订单状态变化
2. 自动更新订单管理系统中的信息
3. 打印日志,让用户知道发生了什么
"""
def __init__(self, order_system):
"""
初始化回调类
参数说明:
- order_system: OrderManagementSystem 实例,用来存储和管理订单
"""
super().__init__()
self.order_system = order_system
def on_stock_order(self, order):
"""
当订单状态发生变化时,这个方法被自动调用
参数说明:
- order: 订单对象,包含订单的所有信息
这个方法会被调用多次,每次订单状态变化都会触发。
"""
# 将 API 返回的状态码转换为易读的状态名
status_map = {
48: 'pending', # 未报
49: 'pending', # 待报
50: 'reported', # 已报
54: 'canceled', # 已撤
55: 'part_traded', # 部成
56: 'completed' # 已成
}
status = status_map.get(order.order_status, 'unknown')
# 更新订单管理系统中的订单状态
self.order_system.update_order_status(
order.order_id,
status,
order.traded_volume
)
# 根据不同的状态打印不同的日志
if status == 'reported':
logging.info(f"✓ 订单 {order.order_id} 已报到交易所")
elif status == 'part_traded':
remaining = order.order_volume - order.traded_volume
logging.info(
f"⚡ 订单 {order.order_id} 部分成交!"
f"已成交 {order.traded_volume}股,还剩 {remaining}股"
)
elif status == 'completed':
logging.info(
f"✓✓ 订单 {order.order_id} 全部成交!"
f"成交数量:{order.traded_volume}股"
)
elif status == 'canceled':
logging.info(
f"✗ 订单 {order.order_id} 已撤销"
f"已成交 {order.traded_volume}股"
)
def on_stock_trade(self, trade):
"""
当有成交发生时,这个方法被调用
参数说明:
- trade: 成交对象,包含成交的详细信息
注意:on_stock_order 和 on_stock_trade 的区别:
- on_stock_order:订单状态变化(可能多次)
- on_stock_trade:实际成交(每笔成交都会触发)
一个订单可能会有多个成交(比如分批成交),所以这个方法可能被调用多次。
"""
logging.info(
f"成交回报:{trade.stock_code} "
f"成交价 {trade.traded_price} × {trade.traded_volume} 股 = "
f"{trade.traded_amount} 元"
)
def on_order_error(self, order_error):
"""
如果下单失败,这个方法会被调用
参数说明:
- order_error: 错误对象,包含错误信息
"""
logging.error(
f"❌ 订单 {order_error.order_id} 下单失败!"
f"错误原因:{order_error.error_msg}"
)
# ============================================================================
# 第四步:主程序
# ============================================================================
def main():
"""
主程序流程:
1. 初始化交易对象和账户
2. 创建订单管理系统
3. 创建回调并注册
4. 启动交易引擎并连接
5. 订阅账户
6. 同时下多个订单
7. 等待订单状态变化
8. 打印最终统计
"""
# ========================================================================
# 配置账户信息
# ========================================================================
path = 'D:/XXXXQMT交易端模拟/userdata_mini/' # 【修改】为你的路径
account_id = '1110001111' # 【修改】为你的客户号
# ========================================================================
# 初始化交易对象
# ========================================================================
session_id = int(time.time())
xt_trader = XtQuantTrader(path, session_id)
acc = StockAccount(account_id)
# ========================================================================
# 创建订单管理系统
# ========================================================================
order_system = OrderManagementSystem()
logging.info("✓ 订单管理系统已创建")
# ========================================================================
# 创建回调并注册
# ========================================================================
callback = SmartCallback(order_system)
xt_trader.register_callback(callback)
logging.info("✓ 回调已注册")
# ========================================================================
# 启动交易引擎
# ========================================================================
xt_trader.start()
logging.info("✓ 交易引擎已启动")
# ========================================================================
# 连接到 MiniQMT
# ========================================================================
if xt_trader.connect() != 0:
logging.error("❌ 连接失败!请检查 MiniQMT 是否已启动并登录")
return
logging.info("✓ 连接成功!")
# ========================================================================
# 订阅账户
# ========================================================================
if xt_trader.subscribe(acc) != 0:
logging.error("❌ 订阅失败!")
return
logging.info("✓ 订阅成功!")
time.sleep(1)
# ========================================================================
# 准备下单
# ========================================================================
logging.info("=" * 80)
logging.info("【准备同时下多个订单】")
logging.info("=" * 80)
# 定义要下的订单列表
# 格式:(股票代码, 订单类型, 数量, 价格)
orders_to_place = [
('600000.SH', STOCK_BUY, 100, 8.0), # 买 100 股浦发银行,价格 8.0 元
('600000.SH', STOCK_BUY, 50, 7.9), # 再买 50 股浦发银行,价格 7.9 元(更低的价格)
('601988.SH', STOCK_BUY, 100, 5.0), # 买 100 股银行股,价格 5.0 元
]
logging.info(f"准备下 {len(orders_to_place)} 个订单...")
# ========================================================================
# 执行下单
# ========================================================================
placed_order_ids = []
for stock_code, order_type, volume, price in orders_to_place:
# 下单
order_id = xt_trader.order_stock(
acc,
stock_code,
order_type,
volume,
FIX_PRICE,
price,
'multi_order_strategy', # 策略名称
f'order_for_{stock_code}' # 订单备注
)
# 检查下单是否成功
if order_id > 0:
# 下单成功,记录到订单管理系统
order_system.add_order(order_id, stock_code, order_type, volume, price)
placed_order_ids.append(order_id)
logging.info(f"✓ 下单成功,订单编号:{order_id}")
else:
# 下单失败
logging.error(f"❌ 下单失败,错误码:{order_id}")
logging.info(f"共成功下单 {len(placed_order_ids)} 个")
# ========================================================================
# 等待订单状态变化
# ========================================================================
logging.info("=" * 80)
logging.info("【等待订单状态变化】")
logging.info("=" * 80)
logging.info("现在程序会监听订单状态变化,10秒后自动停止(或按 Ctrl+C 停止)")
try:
start_time = time.time()
while time.time() - start_time < 10:
time.sleep(0.5)
# run_forever() 会处理回调
xt_trader.run_forever()
except KeyboardInterrupt:
logging.info("\n收到停止信号")
# ========================================================================
# 打印最终统计
# ========================================================================
logging.info("=" * 80)
logging.info("【最终统计】")
logging.info("=" * 80)
# 打印订单摘要
order_system.print_summary()
# 打印统计信息
stats = order_system.get_statistics()
logging.info(f"总订单数:{stats['total_orders']}")
logging.info(f"在途订单数:{stats['pending_orders']}")
logging.info(f"已完成订单数:{stats['completed_orders']}")
logging.info(f"总成交金额:{stats['total_amount']:.2f} 元")
# ========================================================================
# 查询示例
# ========================================================================
logging.info("=" * 80)
logging.info("【查询示例】")
logging.info("=" * 80)
# 查询 600000.SH 还有多少买单在途
pending_buy = order_system.get_total_pending_buy_volume('600000.SH')
logging.info(f"600000.SH 还有 {pending_buy} 股买单在途")
# 查询所有在途订单
pending_orders = order_system.get_pending_orders()
logging.info(f"还有 {len(pending_orders)} 个订单在途")
# 查询所有已完成订单
completed_orders = order_system.get_completed_orders()
logging.info(f"已完成 {len(completed_orders)} 个订单")
# 查询单个订单的详细信息
if placed_order_ids:
first_order_id = placed_order_ids[0]
order_info = order_system.get_order_info(first_order_id)
if order_info:
logging.info(f"订单 {first_order_id} 的详细信息:{order_info}")
# ========================================================================
# 停止交易引擎
# ========================================================================
xt_trader.stop()
logging.info("✓ 程序已停止")
if __name__ == "__main__":
main()日志如下:
QMT/miniQMT免费申请
QMT免费领取学习案例
QMT落地辅助策略代写服务
需要的朋友欢迎联系 ~~~
尊重知识,尊重市场
1
收藏的用户(0)
X
正在加载信息~
原创
著作权归文章作者所有。
