带你详细了解miniQMT的xttrader模块 (第七篇) 要是我想同时下多个单,我该怎么管理它们呢?

QUANT 2025-12-13 16:58:59 29 举报

这个问题比较常见,而它的本质是


当我们的策略变得复杂时,可能就需要同时管理 10 个、100 个订单。如果我们只是用一个变量存储 order_id,是很麻烦的

因此我们需要进行订单管理

我们用实际案例去演示如何使用订单管理系统来管理多个订单(详细的思路步骤都在下面的代码中标注好啦~可以自行翻看学习实操)

解决这个问题的核心思想就是:用一个字典来存储所有订单,按股票分类,随时可以查询


上代码!!

"""
明确我们的执行流程:
1. 创建订单管理系统
2. 同时下 3 个订单(可能是不同的股票,也可能是同一只股票的不同价格)
3. 实时监听订单状态变化,自动更新管理系统
4. 随时查询订单状态、统计在途数量等
5. 程序结束时打印所有订单的摘要
"""

import time
import logging
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant.xtconstant import STOCK_BUY, STOCK_SELL, FIX_PRICE

# ============================================================================
# 第一步:配置日志
# ============================================================================

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)


# ============================================================================
# 第二步:定义订单管理系统
# ============================================================================

class OrderManagementSystem:
    """
    订单管理系统的作用:
    1. 记录所有已下的订单
    2. 追踪每个订单的实时状态
    3. 提供查询和统计功能

       """

    def __init__(self):
        """初始化订单管理系统"""
        # 存储所有订单:{ order_id: order_info }
        # order_info 包含:股票代码、订单类型、数量、价格、状态、已成交数等
        self.orders = {}

        # 按股票代码分类:{ stock_code: [order_id1, order_id2, ...] }
        # 这样可以快速找到某只股票的所有订单
        self.orders_by_stock = {}

        # 记录订单的创建时间,方便后续分析
        self.order_create_times = {}

    def add_order(self, order_id, stock_code, order_type, volume, price):
        """
        记录一个新下的订单

        参数说明:
        - order_id: 订单编号(由交易所生成)
        - stock_code: 股票代码,如 '600000.SH'
        - order_type: 订单类型,STOCK_BUY(买)或 STOCK_SELL(卖)
        - volume: 订单数量,单位是股
        - price: 订单价格,单位是元

        这个方法在你下单成功后立刻调用。
        """
        # 创建订单信息字典
        self.orders[order_id] = {
            'stock_code': stock_code,
            'order_type': order_type,
            'volume': volume,
            'price': price,
            'status': 'pending',  # 初始状态:待报
            'traded_volume': 0,  # 已成交数量,初始为 0
            'create_time': time.time()
        }

        # 按股票分类
        if stock_code not in self.orders_by_stock:
            self.orders_by_stock[stock_code] = []
        self.orders_by_stock[stock_code].append(order_id)

        # 打印日志
        order_type_display = '买' if order_type == STOCK_BUY else '卖'
        logging.info(
            f"✓ 订单已记录:{order_id} - {stock_code} {order_type_display} "
            f"{volume}股 @ {price}元"
        )

    def update_order_status(self, order_id, status, traded_volume):
        """
        更新订单状态

        参数说明:
        - order_id: 订单编号
        - status: 新的状态(字符串形式,如 'reported', 'part_traded', 'completed')
        - traded_volume: 已成交的数量

        这个方法在 on_stock_order 回调中调用,用来实时更新订单信息。
        """
        if order_id in self.orders:
            self.orders[order_id]['status'] = status
            self.orders[order_id]['traded_volume'] = traded_volume

    def get_order_info(self, order_id):
        """
        查询单个订单的详细信息

        返回值:
        - 如果订单存在,返回订单信息字典
        - 如果订单不存在,返回 None
        """
        return self.orders.get(order_id)

    def get_pending_orders(self, stock_code=None):
        """
        获取所有还在途中的订单(未成交或部分成交)

        参数说明:
        - stock_code: 可选,如果指定则只返回该股票的订单

        返回值:
        - 在途订单的 order_id 列表

        什么是"在途"?
        - 待报:订单正在发送中
        - 已报:订单已到交易所,等待成交
        - 部成:订单部分成交了,还有部分在等待
        """
        pending = []
        for order_id, info in self.orders.items():
            # 检查订单状态是否是"在途"
            if info['status'] in ['pending', 'reported', 'part_traded']:
                # 如果指定了股票代码,则只返回该股票的订单
                if stock_code is None or info['stock_code'] == stock_code:
                    pending.append(order_id)
        return pending

    def get_completed_orders(self, stock_code=None):
        """
        获取所有已完成的订单(全部成交或已撤销)

        参数说明:
        - stock_code: 可选,如果指定则只返回该股票的订单

        返回值:
        - 已完成订单的 order_id 列表
        """
        completed = []
        for order_id, info in self.orders.items():
            if info['status'] in ['completed', 'canceled']:
                if stock_code is None or info['stock_code'] == stock_code:
                    completed.append(order_id)
        return completed

    def get_total_pending_buy_volume(self, stock_code):
        """
        计算某只股票还有多少买单在途

        这是一个非常重要的方法!
        它可以帮助你防止重复下单。

        比如:
        - 你想买 200 股 600000.SH
        - 但已经有 100 股的买单在途
        - 那么你最多还能再买 100 股

        参数说明:
        - stock_code: 股票代码

        返回值:
        - 该股票还未成交的买单总数量
        """
        total = 0
        # 遍历该股票的所有订单
        for order_id in self.orders_by_stock.get(stock_code, []):
            info = self.orders[order_id]
            # 检查是否是买单 + 还在途中
            if (info['order_type'] == STOCK_BUY and
                    info['status'] in ['pending', 'reported', 'part_traded']):
                # 未成交的数量 = 总数 - 已成交数
                untraded = info['volume'] - info['traded_volume']
                total += untraded
        return total

    def get_total_pending_sell_volume(self, stock_code):
        """
        计算某只股票还有多少卖单在途

        逻辑同 get_total_pending_buy_volume,只是统计的是卖单。
        """
        total = 0
        for order_id in self.orders_by_stock.get(stock_code, []):
            info = self.orders[order_id]
            if (info['order_type'] == STOCK_SELL and
                    info['status'] in ['pending', 'reported', 'part_traded']):
                untraded = info['volume'] - info['traded_volume']
                total += untraded
        return total

    def print_summary(self):
        """
        打印所有订单的摘要

        这个方法在程序结束时调用,用来显示所有订单的最终状态。
        """
        logging.info("=" * 80)
        logging.info("【订单摘要】")
        logging.info("=" * 80)

        if not self.orders:
            logging.info("没有任何订单")
            return

        # 遍历所有订单并打印
        for order_id, info in self.orders.items():
            # 将状态码转换为易读的状态名
            status_display = {
                'pending': '待报',
                'reported': '已报',
                'part_traded': '部成',
                'completed': '已成',
                'canceled': '已撤'
            }.get(info['status'], '未知')

            # 将订单类型转换为易读的名称
            order_type_display = '买' if info['order_type'] == STOCK_BUY else '卖'

            # 计算未成交数量
            untraded = info['volume'] - info['traded_volume']

            # 打印订单信息
            logging.info(
                f"订单 {order_id}: {info['stock_code']} {order_type_display} "
                f"{info['volume']}股 @ {info['price']}元 [{status_display}] "
                f"已成交 {info['traded_volume']}股,未成交 {untraded}股"
            )

        logging.info("=" * 80)

    def get_statistics(self):
        """
        获取订单统计信息

        返回一个字典,包含:
        - 总订单数
        - 在途订单数
        - 已完成订单数
        - 总成交金额
        等等
        """
        total_orders = len(self.orders)
        pending_orders = len(self.get_pending_orders())
        completed_orders = len(self.get_completed_orders())

        total_amount = 0
        for info in self.orders.values():
            total_amount += info['price'] * info['traded_volume']

        return {
            'total_orders': total_orders,
            'pending_orders': pending_orders,
            'completed_orders': completed_orders,
            'total_amount': total_amount
        }


# ============================================================================
# 第三步:定义回调类,处理订单状态变化
# ============================================================================

class SmartCallback(XtQuantTraderCallback):
    """
    这个回调类的作用:
    1. 监听所有订单状态变化
    2. 自动更新订单管理系统中的信息
    3. 打印日志,让用户知道发生了什么
    """

    def __init__(self, order_system):
        """
        初始化回调类

        参数说明:
        - order_system: OrderManagementSystem 实例,用来存储和管理订单
        """
        super().__init__()
        self.order_system = order_system

    def on_stock_order(self, order):
        """
        当订单状态发生变化时,这个方法被自动调用

        参数说明:
        - order: 订单对象,包含订单的所有信息

        这个方法会被调用多次,每次订单状态变化都会触发。
        """
        # 将 API 返回的状态码转换为易读的状态名
        status_map = {
            48: 'pending',  # 未报
            49: 'pending',  # 待报
            50: 'reported',  # 已报
            54: 'canceled',  # 已撤
            55: 'part_traded',  # 部成
            56: 'completed'  # 已成
        }

        status = status_map.get(order.order_status, 'unknown')

        # 更新订单管理系统中的订单状态
        self.order_system.update_order_status(
            order.order_id,
            status,
            order.traded_volume
        )

        # 根据不同的状态打印不同的日志
        if status == 'reported':
            logging.info(f"✓ 订单 {order.order_id} 已报到交易所")

        elif status == 'part_traded':
            remaining = order.order_volume - order.traded_volume
            logging.info(
                f"⚡ 订单 {order.order_id} 部分成交!"
                f"已成交 {order.traded_volume}股,还剩 {remaining}股"
            )

        elif status == 'completed':
            logging.info(
                f"✓✓ 订单 {order.order_id} 全部成交!"
                f"成交数量:{order.traded_volume}股"
            )

        elif status == 'canceled':
            logging.info(
                f"✗ 订单 {order.order_id} 已撤销"
                f"已成交 {order.traded_volume}股"
            )

    def on_stock_trade(self, trade):
        """
        当有成交发生时,这个方法被调用

        参数说明:
        - trade: 成交对象,包含成交的详细信息

        注意:on_stock_order 和 on_stock_trade 的区别:
        - on_stock_order:订单状态变化(可能多次)
        - on_stock_trade:实际成交(每笔成交都会触发)

        一个订单可能会有多个成交(比如分批成交),所以这个方法可能被调用多次。
        """
        logging.info(
            f"成交回报:{trade.stock_code} "
            f"成交价 {trade.traded_price} × {trade.traded_volume} 股 = "
            f"{trade.traded_amount} 元"
        )

    def on_order_error(self, order_error):
        """
        如果下单失败,这个方法会被调用

        参数说明:
        - order_error: 错误对象,包含错误信息
        """
        logging.error(
            f"❌ 订单 {order_error.order_id} 下单失败!"
            f"错误原因:{order_error.error_msg}"
        )


# ============================================================================
# 第四步:主程序
# ============================================================================

def main():
    """
    主程序流程:
    1. 初始化交易对象和账户
    2. 创建订单管理系统
    3. 创建回调并注册
    4. 启动交易引擎并连接
    5. 订阅账户
    6. 同时下多个订单
    7. 等待订单状态变化
    8. 打印最终统计
    """

    # ========================================================================
    # 配置账户信息
    # ========================================================================

    path = 'D:/XXXXQMT交易端模拟/userdata_mini/'  # 【修改】为你的路径
    account_id = '1110001111'  # 【修改】为你的客户号

    # ========================================================================
    # 初始化交易对象
    # ========================================================================

    session_id = int(time.time())
    xt_trader = XtQuantTrader(path, session_id)
    acc = StockAccount(account_id)

    # ========================================================================
    # 创建订单管理系统
    # ========================================================================

    order_system = OrderManagementSystem()
    logging.info("✓ 订单管理系统已创建")

    # ========================================================================
    # 创建回调并注册
    # ========================================================================

    callback = SmartCallback(order_system)
    xt_trader.register_callback(callback)
    logging.info("✓ 回调已注册")

    # ========================================================================
    # 启动交易引擎
    # ========================================================================

    xt_trader.start()
    logging.info("✓ 交易引擎已启动")

    # ========================================================================
    # 连接到 MiniQMT
    # ========================================================================

    if xt_trader.connect() != 0:
        logging.error("❌ 连接失败!请检查 MiniQMT 是否已启动并登录")
        return

    logging.info("✓ 连接成功!")

    # ========================================================================
    # 订阅账户
    # ========================================================================

    if xt_trader.subscribe(acc) != 0:
        logging.error("❌ 订阅失败!")
        return

    logging.info("✓ 订阅成功!")
    time.sleep(1)

    # ========================================================================
    # 准备下单
    # ========================================================================

    logging.info("=" * 80)
    logging.info("【准备同时下多个订单】")
    logging.info("=" * 80)

    # 定义要下的订单列表
    # 格式:(股票代码, 订单类型, 数量, 价格)
    orders_to_place = [
        ('600000.SH', STOCK_BUY, 100, 8.0),  # 买 100 股浦发银行,价格 8.0 元
        ('600000.SH', STOCK_BUY, 50, 7.9),  # 再买 50 股浦发银行,价格 7.9 元(更低的价格)
        ('601988.SH', STOCK_BUY, 100, 5.0),  # 买 100 股银行股,价格 5.0 元
    ]

    logging.info(f"准备下 {len(orders_to_place)} 个订单...")

    # ========================================================================
    # 执行下单
    # ========================================================================

    placed_order_ids = []

    for stock_code, order_type, volume, price in orders_to_place:
        # 下单
        order_id = xt_trader.order_stock(
            acc,
            stock_code,
            order_type,
            volume,
            FIX_PRICE,
            price,
            'multi_order_strategy',  # 策略名称
            f'order_for_{stock_code}'  # 订单备注
        )

        # 检查下单是否成功
        if order_id > 0:
            # 下单成功,记录到订单管理系统
            order_system.add_order(order_id, stock_code, order_type, volume, price)
            placed_order_ids.append(order_id)
            logging.info(f"✓ 下单成功,订单编号:{order_id}")
        else:
            # 下单失败
            logging.error(f"❌ 下单失败,错误码:{order_id}")

    logging.info(f"共成功下单 {len(placed_order_ids)} 个")

    # ========================================================================
    # 等待订单状态变化
    # ========================================================================

    logging.info("=" * 80)
    logging.info("【等待订单状态变化】")
    logging.info("=" * 80)
    logging.info("现在程序会监听订单状态变化,10秒后自动停止(或按 Ctrl+C 停止)")

    try:
        start_time = time.time()
        while time.time() - start_time < 10:
            time.sleep(0.5)
            # run_forever() 会处理回调
            xt_trader.run_forever()
    except KeyboardInterrupt:
        logging.info("\n收到停止信号")

    # ========================================================================
    # 打印最终统计
    # ========================================================================

    logging.info("=" * 80)
    logging.info("【最终统计】")
    logging.info("=" * 80)

    # 打印订单摘要
    order_system.print_summary()

    # 打印统计信息
    stats = order_system.get_statistics()
    logging.info(f"总订单数:{stats['total_orders']}")
    logging.info(f"在途订单数:{stats['pending_orders']}")
    logging.info(f"已完成订单数:{stats['completed_orders']}")
    logging.info(f"总成交金额:{stats['total_amount']:.2f} 元")

    # ========================================================================
    # 查询示例
    # ========================================================================

    logging.info("=" * 80)
    logging.info("【查询示例】")
    logging.info("=" * 80)

    # 查询 600000.SH 还有多少买单在途
    pending_buy = order_system.get_total_pending_buy_volume('600000.SH')
    logging.info(f"600000.SH 还有 {pending_buy} 股买单在途")

    # 查询所有在途订单
    pending_orders = order_system.get_pending_orders()
    logging.info(f"还有 {len(pending_orders)} 个订单在途")

    # 查询所有已完成订单
    completed_orders = order_system.get_completed_orders()
    logging.info(f"已完成 {len(completed_orders)} 个订单")

    # 查询单个订单的详细信息
    if placed_order_ids:
        first_order_id = placed_order_ids[0]
        order_info = order_system.get_order_info(first_order_id)
        if order_info:
            logging.info(f"订单 {first_order_id} 的详细信息:{order_info}")

    # ========================================================================
    # 停止交易引擎
    # ========================================================================

    xt_trader.stop()
    logging.info("✓ 程序已停止")


if __name__ == "__main__":
    main()

日志如下:



QMT/miniQMT免费申请

QMT免费领取学习案例

QMT落地辅助策略代写服务

需要的朋友欢迎联系  ~~~



尊重知识,尊重市场 1

著作权归文章作者所有。

最新回复 ( 0 )
发新帖
0
DEPRECATED: addslashes(): Passing null to parameter #1 ($string) of type string is deprecated (/data/user/htdocs/xiunophp/xiunophp.min.php:48)