带你详细了解miniQMT的xttrader模块 (第八篇) 一些常见使用示例

QUANT 2025-12-15 17:08:27 22 举报

整理了一些常见的小案例和要点~分享给大家


场景1:防止重复下单

# 我们想买 200 股 600000.SH
# 但要先检查是否已经有太多买单在途
pending = order_system.get_total_pending_buy_volume('600000.SH')
if pending < 100:
   # 只有不足 100 股的买单在途,可以继续买
   xt_trader.order_stock(acc, '600000.SH', STOCK_BUY, 100, 8.0)
else:
   logging.info(f"已经有 {pending} 股的买单在途,不再下单")


场景2:查询订单详情

# 查询某个订单的详细信息
order_info = order_system.get_order_info(order_id)
if order_info:
   print(f"股票:{order_info['stock_code']}")
   print(f"类型:{'买' if order_info['order_type'] == STOCK_BUY else '卖'}")
   print(f"数量:{order_info['volume']}")
   print(f"价格:{order_info['price']}")
   print(f"状态:{order_info['status']}")
   print(f"已成交:{order_info['traded_volume']}")


场景3:统计在途订单

# 查询所有在途订单
pending_orders = order_system.get_pending_orders()
logging.info(f"还有 {len(pending_orders)} 个订单在途")

# 查询所有已完成订单
completed_orders = order_system.get_completed_orders()
logging.info(f"已完成 {len(completed_orders)} 个订单")

# 获取统计信息
stats = order_system.get_statistics()
logging.info(f"总成交金额:{stats['total_amount']:.2f} 元")


场景4:按股票查询

# 查询 600000.SH 的所有在途订单
pending_600000 = order_system.get_pending_orders('600000.SH')
logging.info(f"600000.SH 还有 {len(pending_600000)} 个订单在途")

# 查询 600000.SH 的所有已完成订单
completed_600000 = order_system.get_completed_orders('600000.SH')
logging.info(f"600000.SH 已完成 {len(completed_600000)} 个订单")


场景5:订单状态一览

状态说明:
•48 (未报):订单还没发出去
•49 (待报):订单正在发送中
•50 (已报):订单已到交易所,等待成交
•55 (部成):订单部分成交了
•56 (已成):订单全部成交了
•54 (已撤):订单被撤销了
•57 (废单):交易所拒绝了这个订单


常见点6:记录订单

from xtquant import xtconstant
import logging
from datetime import datetime

# 配置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")


class OrderSystem:
    def __init__(self):
        self.orders = {}  # 内存中存储订单信息
        self.order_counter = 0  # 订单计数器

    def add_order(self, order_id, stock_code, order_type, volume, price, price_type=xtconstant.FIX_PRICE,
                  strategy_name="", remark=""):
        """记录订单信息"""
        try:
            self.orders[order_id] = {
                "stock_code": stock_code,
                "order_type": order_type,
                "volume": volume,
                "price": price,
                "price_type": price_type,
                "strategy_name": strategy_name,
                "remark": remark,
                "status": "pending",  # 初始状态
                "create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }
            self.order_counter += 1
            logging.info(f"订单记录成功,ID: {order_id}, 股票: {stock_code}, 数量: {volume}")
            return True
        except Exception as e:
            logging.error(f"订单记录失败: {e}")
            return False

    def update_order_status(self, order_id, status, traded_volume=0):
        """更新订单状态"""
        if order_id in self.orders:
            self.orders[order_id]["status"] = status
            self.orders[order_id]["update_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            if traded_volume > 0:
                self.orders[order_id]["traded_volume"] = traded_volume
            logging.info(f"订单状态更新: {order_id} -> {status}")
            return True
        else:
            logging.warning(f"订单不存在: {order_id}")
            return False

    def get_order(self, order_id):
        """获取订单信息"""
        return self.orders.get(order_id)

    def get_all_orders(self):
        """获取所有订单"""
        return self.orders

    def get_pending_orders(self):
        """获取待处理订单"""
        return {oid: order for oid, order in self.orders.items() if order["status"] == "pending"}


# 使用示例
if __name__ == "__main__":
    # 初始化订单系统
    order_system = OrderSystem()

    # 模拟下一单
    # order_id = xt_trader.order_stock(acc, "600000.SH", xtconstant.STOCK_BUY, 100, xtconstant.FIX_PRICE, 10.5, "strategy1", "test")

    # 模拟订单ID(实际从xt_trader.order_stock获取)
    order_id = 100001

    if order_id > 0:
        success = order_system.add_order(
            order_id=order_id,
            stock_code="600000.SH",
            order_type=xtconstant.STOCK_BUY,
            volume=100,
            price=10.5,
            price_type=xtconstant.FIX_PRICE,
            strategy_name="test_strategy",
            remark="测试订单"
        )

        if success:
            logging.info(f"订单记录成功,ID: {order_id}")

            # 模拟订单状态更新
            order_system.update_order_status(order_id, "submitted")
            order_system.update_order_status(order_id, "partially_filled", 50)
            order_system.update_order_status(order_id, "filled", 100)

        else:
            logging.error("订单记录失败")
    else:
        logging.error(f"下单失败,错误码: {order_id}")


常见点7:在回调中执行耗时操作

import threading
import time
import logging
from xtquant.xttrader import XtQuantTraderCallback

# 配置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

class MyTraderCallback(XtQuantTraderCallback):
    def __init__(self):
        super().__init__()
        self.orders = {}  # 内存订单状态缓存
        
    def on_stock_order(self, order):
        """
        委托回报回调 - 使用多线程处理耗时操作
        """
        try:
            # 快速操作:更新内存状态(主线程执行)
            self.update_order_status(order.order_id, order.order_status)
            
            # 耗时操作放到线程中执行(避免阻塞主回调线程)
            thread = threading.Thread(
                target=self.process_order_async, 
                args=(order,),
                name=f"OrderProcess-{order.order_id}"
            )
            thread.daemon = True  # 设置为守护线程
            thread.start()
            
        except Exception as e:
            logging.error(f"订单回调处理异常: {e}")
    
    def update_order_status(self, order_id, status):
        """快速的内存操作"""
        self.orders[order_id] = {
            "status": status,
            "update_time": time.time()
        }
        logging.debug(f"订单状态更新: {order_id} -> {status}")
    
    def process_order_async(self, order):
        """耗时的异步处理操作"""
        try:
            # 模拟耗时操作(如数据库写入、网络请求等)
            logging.info(f"开始异步处理订单: {order.order_id}")
            time.sleep(2)  # 模拟耗时操作
            
            # 实际业务逻辑(数据库保存、消息推送等)
            self.save_to_database(order)
            self.send_notification(order)
            
            logging.info(f"订单 {order.order_id} 异步处理完成")
            
        except Exception as e:
            logging.error(f"订单异步处理失败: {order.order_id}, 错误: {e}")
    
    def save_to_database(self, order):
        """模拟数据库保存操作"""
        # 这里实现具体的数据库保存逻辑
        # 例如: db.insert_order(order.to_dict())
        logging.debug(f"订单数据已保存到数据库: {order.order_id}")
    
    def send_notification(self, order):
        """模拟发送通知操作"""
        # 这里实现消息推送、邮件通知等逻辑
        logging.debug(f"订单通知已发送: {order.order_id}")

# 使用示例
if __name__ == "__main__":
    # 创建回调实例
    callback = MyTraderCallback()
    
    # 订单回调
    class MockOrder:
        def __init__(self, order_id, order_status):
            self.order_id = order_id
            self.order_status = order_status
    
    # 测试回调处理
    test_order = MockOrder("123456", 0)  # 0表示已报状态
    callback.on_stock_order(test_order)
    
    # 等待异步线程完成
    time.sleep(3)
    logging.info("测试完成")


QMT/miniQMT免费申请

QMT免费领取学习案例

QMT落地辅助策略代写服务

需要的朋友欢迎联系  ~~~


尊重知识,尊重市场 1

著作权归文章作者所有。

最新回复 ( 0 )
发新帖
0
DEPRECATED: addslashes(): Passing null to parameter #1 ($string) of type string is deprecated (/data/user/htdocs/xiunophp/xiunophp.min.php:48)