项目环境,C++11,Qt6.10,通过使用本地python3.9环境调用datafeed.py脚本,datafeed.py调用迅投的初始化函数:
import sys
import os
import json
from datetime import datetime, timedelta, time
from collections.abc import Callable
import threading
#from filelock import FileLock, Timeout
#import pandas as pd
from xtquant import xtdata, xtdatacenter as xtdc
class SimpleDataFrame:
def __init__(self, data=None):
self.data = data or [] # 每行是dict:{"time": xxx, "open": xxx}
self.columns = [] # 列名:["time", "open", "high", ...]
def empty(self):
return len(self.data) == 0
# 全局配置
CONFIG_PATH = os.path.join(os.path.dirname(__file__), "xt_config.json")
XT_INITED = False
LOCK = None
INTERVAL_VT2XT = {
"MINUTE": "1m",
"DAILY": "1d",
"TICK": "tick"
}
INTERVAL_ADJUSTMENT_MAP = {
"MINUTE": timedelta(minutes=1),
"DAILY": timedelta()
}
EXCHANGE_VT2XT = {
"SSE": "SH",
"SZSE": "SZ",
"BSE": "BJ",
"SHFE": "SF",
"CFFEX": "IF",
"INE": "INE",
"DCE": "DF",
"CZCE": "ZF",
"GFEX": "GF"
}
CHINA_TZ = datetime.now().astimezone().tzinfo # 兼容C++11无ZoneInfo的场景
def load_config() -> dict:
# 定义UTF-8 BOM头标识
UTF8_BOM = b'\xef\xbb\xbf'
try:
# 步骤1:以二进制模式读取文件(避免编码解析干扰,准确识别BOM头)
with open(CONFIG_PATH, "rb") as f:
file_content = f.read()
# 步骤2:判断并移除BOM头(若存在)
if file_content.startswith(UTF8_BOM):
file_content = file_content[3:] # 跳过前3个字节的BOM头
# 步骤3:将处理后的字节内容解码为UTF-8字符串,再解析JSON
config_str = file_content.decode("utf-8")
config = json.loads(config_str)
return config["xt_config"]
except FileNotFoundError:
raise Exception(f"配置文件不存在:{CONFIG_PATH}")
except json.JSONDecodeError as e:
raise Exception(f"JSON格式解析失败(可能是BOM处理异常或文件格式错误):{e}")
except Exception as e:
raise Exception(f"读取配置文件失败:{e}")
def get_file_path(filename: str) -> str:
return os.path.join(os.getcwd(), filename)
def get_lock(config: dict) -> bool:
"""获取文件锁"""
global LOCK
lock_filename = config.get("lock_filename", "xt_lock")
lock_filepath = get_file_path(lock_filename)
LOCK = FileLock(lock_filepath)
try:
LOCK.acquire(timeout=1)
return True
except Timeout:
return False
def init(output: Callable = print) -> dict:
result = {"success" : True,"error_msg" : ""}
"""初始化迅投数据中心"""
global XT_INITED
if XT_INITED:
output("迅投数据中心已初始化,无需重复执行")
return result
# 读取配置
config = load_config()
username = config["username"]
password = config["password"]
vip_address_list = config["vip_address_list"]
listen_port = config["listen_port"]
try:
# 基础配置
xtdata.enable_hello = False
output(f"username:{str(username)}")
output(f"password:{str(password)}")
output(f"listen_port:{str(listen_port)}")
output(f"vip_address_list:{str(vip_address_list[0])}")
# Token连接模式
if username != "client":
xtdc.set_token(password)
xtdc.set_allow_optmize_address(vip_address_list)
xtdc.set_future_realtime_mode(True)
xtdc.init(False)
xtdc.listen(port=listen_port)
# 验证连接
xtdata.get_instrument_detail("000001.SZ")
XT_INITED = True
output("迅投数据中心初始化成功!")
return result
except Exception as ex:
output(f"迅投数据中心初始化失败...:{str(ex)}")
result["success"] = False
result["error_msg"] = str(ex)
return result
# (后续扩展用)
def get_history_df(req: dict, output: Callable = print)->SimpleDataFrame:
"""获取历史数据DataFrame"""
symbol = req["symbol"]
exchange = req["exchange"]
start_dt = datetime.strptime(req["start"], "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(req["end"], "%Y-%m-%d %H:%M:%S")
interval = req["interval"]
xt_interval = INTERVAL_VT2XT.get(interval, None)
if not xt_interval:
output(f"不支持的时间周期:{interval}")
return pd.DataFrame()
end_dt += timedelta(1)
xt_symbol = f"{symbol}.{EXCHANGE_VT2XT[exchange]}"
# 处理期权后缀
if exchange in ("SSE", "SZSE") and len(symbol) > 6:
xt_symbol += "O"
# 下载并获取数据
start = start_dt.strftime("%Y%m%d%H%M%S")
end = end_dt.strftime("%Y%m%d%H%M%S")
xtdata.download_history_data(xt_symbol, xt_interval, start, end)
data = xtdata.get_local_data([], [xt_symbol], xt_interval, start, end, -1, "front_ratio", False)
df = SimpleDataFrame(data[xt_symbol])
return df
if __name__ == "__main__":
init(print)
当执行到xtdc.init(False)时,会一直卡在这里,也没有任何报错。配置信息用户名密码都没有问题IP列表端口等也没问题,因为通过相同的配置,通过vnpy纯python脚本调用迅投接口初始化是成功的。是哪里的问题呢?