整理了一些常见的小案例和要点~分享给大家
场景1:防止重复下单
# 我们想买 200 股 600000.SH
# 但要先检查是否已经有太多买单在途
pending = order_system.get_total_pending_buy_volume('600000.SH')
if pending < 100:
# 只有不足 100 股的买单在途,可以继续买
xt_trader.order_stock(acc, '600000.SH', STOCK_BUY, 100, 8.0)
else:
logging.info(f"已经有 {pending} 股的买单在途,不再下单")
场景2:查询订单详情
# 查询某个订单的详细信息
order_info = order_system.get_order_info(order_id)
if order_info:
print(f"股票:{order_info['stock_code']}")
print(f"类型:{'买' if order_info['order_type'] == STOCK_BUY else '卖'}")
print(f"数量:{order_info['volume']}")
print(f"价格:{order_info['price']}")
print(f"状态:{order_info['status']}")
print(f"已成交:{order_info['traded_volume']}")
场景3:统计在途订单
# 查询所有在途订单
pending_orders = order_system.get_pending_orders()
logging.info(f"还有 {len(pending_orders)} 个订单在途")
# 查询所有已完成订单
completed_orders = order_system.get_completed_orders()
logging.info(f"已完成 {len(completed_orders)} 个订单")
# 获取统计信息
stats = order_system.get_statistics()
logging.info(f"总成交金额:{stats['total_amount']:.2f} 元")场景4:按股票查询
# 查询 600000.SH 的所有在途订单
pending_600000 = order_system.get_pending_orders('600000.SH')
logging.info(f"600000.SH 还有 {len(pending_600000)} 个订单在途")
# 查询 600000.SH 的所有已完成订单
completed_600000 = order_system.get_completed_orders('600000.SH')
logging.info(f"600000.SH 已完成 {len(completed_600000)} 个订单")场景5:订单状态一览
状态说明:
•48 (未报):订单还没发出去
•49 (待报):订单正在发送中
•50 (已报):订单已到交易所,等待成交
•55 (部成):订单部分成交了
•56 (已成):订单全部成交了
•54 (已撤):订单被撤销了
•57 (废单):交易所拒绝了这个订单
常见点6:记录订单
from xtquant import xtconstant
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
class OrderSystem:
def __init__(self):
self.orders = {} # 内存中存储订单信息
self.order_counter = 0 # 订单计数器
def add_order(self, order_id, stock_code, order_type, volume, price, price_type=xtconstant.FIX_PRICE,
strategy_name="", remark=""):
"""记录订单信息"""
try:
self.orders[order_id] = {
"stock_code": stock_code,
"order_type": order_type,
"volume": volume,
"price": price,
"price_type": price_type,
"strategy_name": strategy_name,
"remark": remark,
"status": "pending", # 初始状态
"create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
self.order_counter += 1
logging.info(f"订单记录成功,ID: {order_id}, 股票: {stock_code}, 数量: {volume}")
return True
except Exception as e:
logging.error(f"订单记录失败: {e}")
return False
def update_order_status(self, order_id, status, traded_volume=0):
"""更新订单状态"""
if order_id in self.orders:
self.orders[order_id]["status"] = status
self.orders[order_id]["update_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if traded_volume > 0:
self.orders[order_id]["traded_volume"] = traded_volume
logging.info(f"订单状态更新: {order_id} -> {status}")
return True
else:
logging.warning(f"订单不存在: {order_id}")
return False
def get_order(self, order_id):
"""获取订单信息"""
return self.orders.get(order_id)
def get_all_orders(self):
"""获取所有订单"""
return self.orders
def get_pending_orders(self):
"""获取待处理订单"""
return {oid: order for oid, order in self.orders.items() if order["status"] == "pending"}
# 使用示例
if __name__ == "__main__":
# 初始化订单系统
order_system = OrderSystem()
# 模拟下一单
# order_id = xt_trader.order_stock(acc, "600000.SH", xtconstant.STOCK_BUY, 100, xtconstant.FIX_PRICE, 10.5, "strategy1", "test")
# 模拟订单ID(实际从xt_trader.order_stock获取)
order_id = 100001
if order_id > 0:
success = order_system.add_order(
order_id=order_id,
stock_code="600000.SH",
order_type=xtconstant.STOCK_BUY,
volume=100,
price=10.5,
price_type=xtconstant.FIX_PRICE,
strategy_name="test_strategy",
remark="测试订单"
)
if success:
logging.info(f"订单记录成功,ID: {order_id}")
# 模拟订单状态更新
order_system.update_order_status(order_id, "submitted")
order_system.update_order_status(order_id, "partially_filled", 50)
order_system.update_order_status(order_id, "filled", 100)
else:
logging.error("订单记录失败")
else:
logging.error(f"下单失败,错误码: {order_id}")常见点7:在回调中执行耗时操作
import threading
import time
import logging
from xtquant.xttrader import XtQuantTraderCallback
# 配置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
class MyTraderCallback(XtQuantTraderCallback):
def __init__(self):
super().__init__()
self.orders = {} # 内存订单状态缓存
def on_stock_order(self, order):
"""
委托回报回调 - 使用多线程处理耗时操作
"""
try:
# 快速操作:更新内存状态(主线程执行)
self.update_order_status(order.order_id, order.order_status)
# 耗时操作放到线程中执行(避免阻塞主回调线程)
thread = threading.Thread(
target=self.process_order_async,
args=(order,),
name=f"OrderProcess-{order.order_id}"
)
thread.daemon = True # 设置为守护线程
thread.start()
except Exception as e:
logging.error(f"订单回调处理异常: {e}")
def update_order_status(self, order_id, status):
"""快速的内存操作"""
self.orders[order_id] = {
"status": status,
"update_time": time.time()
}
logging.debug(f"订单状态更新: {order_id} -> {status}")
def process_order_async(self, order):
"""耗时的异步处理操作"""
try:
# 模拟耗时操作(如数据库写入、网络请求等)
logging.info(f"开始异步处理订单: {order.order_id}")
time.sleep(2) # 模拟耗时操作
# 实际业务逻辑(数据库保存、消息推送等)
self.save_to_database(order)
self.send_notification(order)
logging.info(f"订单 {order.order_id} 异步处理完成")
except Exception as e:
logging.error(f"订单异步处理失败: {order.order_id}, 错误: {e}")
def save_to_database(self, order):
"""模拟数据库保存操作"""
# 这里实现具体的数据库保存逻辑
# 例如: db.insert_order(order.to_dict())
logging.debug(f"订单数据已保存到数据库: {order.order_id}")
def send_notification(self, order):
"""模拟发送通知操作"""
# 这里实现消息推送、邮件通知等逻辑
logging.debug(f"订单通知已发送: {order.order_id}")
# 使用示例
if __name__ == "__main__":
# 创建回调实例
callback = MyTraderCallback()
# 订单回调
class MockOrder:
def __init__(self, order_id, order_status):
self.order_id = order_id
self.order_status = order_status
# 测试回调处理
test_order = MockOrder("123456", 0) # 0表示已报状态
callback.on_stock_order(test_order)
# 等待异步线程完成
time.sleep(3)
logging.info("测试完成")QMT/miniQMT免费申请
QMT免费领取学习案例
QMT落地辅助策略代写服务
需要的朋友欢迎联系 ~~~
著作权归文章作者所有。