返回列表 发布新帖

C++调用迅投行情接口的初始化时,进程阻塞

40 0

项目环境,C++11,Qt6.10,通过使用本地python3.9环境调用datafeed.py脚本,datafeed.py调用迅投的初始化函数:

import sys
import os
import json
from datetime import datetime, timedelta, time
from collections.abc import Callable
import threading
#from filelock import FileLock, Timeout

#import pandas as pd
from xtquant import xtdata, xtdatacenter as xtdc
class SimpleDataFrame:
    def __init__(self, data=None):
        self.data = data or []  # 每行是dict:{"time": xxx, "open": xxx}
        self.columns = []       # 列名:["time", "open", "high", ...]

    def empty(self):
        return len(self.data) == 0
# 全局配置
CONFIG_PATH = os.path.join(os.path.dirname(__file__), "xt_config.json")
XT_INITED = False
LOCK = None

INTERVAL_VT2XT = {
    "MINUTE": "1m",
    "DAILY": "1d",
    "TICK": "tick"
}
INTERVAL_ADJUSTMENT_MAP = {
    "MINUTE": timedelta(minutes=1),
    "DAILY": timedelta()
}
EXCHANGE_VT2XT = {
    "SSE": "SH",
    "SZSE": "SZ",
    "BSE": "BJ",
    "SHFE": "SF",
    "CFFEX": "IF",
    "INE": "INE",
    "DCE": "DF",
    "CZCE": "ZF",
    "GFEX": "GF"
}
CHINA_TZ = datetime.now().astimezone().tzinfo  # 兼容C++11无ZoneInfo的场景


def load_config() -> dict:
    # 定义UTF-8 BOM头标识
    UTF8_BOM = b'\xef\xbb\xbf'

    try:
        # 步骤1:以二进制模式读取文件(避免编码解析干扰,准确识别BOM头)
        with open(CONFIG_PATH, "rb") as f:
            file_content = f.read()

        # 步骤2:判断并移除BOM头(若存在)
        if file_content.startswith(UTF8_BOM):
            file_content = file_content[3:]  # 跳过前3个字节的BOM头

        # 步骤3:将处理后的字节内容解码为UTF-8字符串,再解析JSON
        config_str = file_content.decode("utf-8")
        config = json.loads(config_str)

        return config["xt_config"]

    except FileNotFoundError:
        raise Exception(f"配置文件不存在:{CONFIG_PATH}")
    except json.JSONDecodeError as e:
        raise Exception(f"JSON格式解析失败(可能是BOM处理异常或文件格式错误):{e}")
    except Exception as e:
        raise Exception(f"读取配置文件失败:{e}")


def get_file_path(filename: str) -> str:
    return os.path.join(os.getcwd(), filename)


def get_lock(config: dict) -> bool:
    """获取文件锁"""
    global LOCK
    lock_filename = config.get("lock_filename", "xt_lock")
    lock_filepath = get_file_path(lock_filename)
    LOCK = FileLock(lock_filepath)

    try:
        LOCK.acquire(timeout=1)
        return True
    except Timeout:
        return False


def init(output: Callable = print) -> dict:
    result = {"success" : True,"error_msg" : ""}

    """初始化迅投数据中心"""
    global XT_INITED
    if XT_INITED:
        output("迅投数据中心已初始化,无需重复执行")
        return result

    # 读取配置
    config = load_config()
    username = config["username"]
    password = config["password"]
    vip_address_list = config["vip_address_list"]
    listen_port = config["listen_port"]

    try:
        # 基础配置
        xtdata.enable_hello = False
        output(f"username:{str(username)}")
        output(f"password:{str(password)}")
        output(f"listen_port:{str(listen_port)}")
        output(f"vip_address_list:{str(vip_address_list[0])}")
        # Token连接模式
        if username != "client":  
            xtdc.set_token(password)
            xtdc.set_allow_optmize_address(vip_address_list)
            xtdc.set_future_realtime_mode(True)
            xtdc.init(False)
            xtdc.listen(port=listen_port)

        # 验证连接
        xtdata.get_instrument_detail("000001.SZ")
        XT_INITED = True
        output("迅投数据中心初始化成功!")
        return result
    except Exception as ex:
        output(f"迅投数据中心初始化失败...:{str(ex)}")
        result["success"] = False
        result["error_msg"] = str(ex)
        return result


# (后续扩展用)
def get_history_df(req: dict, output: Callable = print)->SimpleDataFrame:
    """获取历史数据DataFrame"""
    symbol = req["symbol"]
    exchange = req["exchange"]
    start_dt = datetime.strptime(req["start"], "%Y-%m-%d %H:%M:%S")
    end_dt = datetime.strptime(req["end"], "%Y-%m-%d %H:%M:%S")
    interval = req["interval"]

    xt_interval = INTERVAL_VT2XT.get(interval, None)
    if not xt_interval:
        output(f"不支持的时间周期:{interval}")
        return pd.DataFrame()

    end_dt += timedelta(1)
    xt_symbol = f"{symbol}.{EXCHANGE_VT2XT[exchange]}"

    # 处理期权后缀
    if exchange in ("SSE", "SZSE") and len(symbol) > 6:
        xt_symbol += "O"

    # 下载并获取数据
    start = start_dt.strftime("%Y%m%d%H%M%S")
    end = end_dt.strftime("%Y%m%d%H%M%S")
    xtdata.download_history_data(xt_symbol, xt_interval, start, end)
    data = xtdata.get_local_data([], [xt_symbol], xt_interval, start, end, -1, "front_ratio", False)
    df = SimpleDataFrame(data[xt_symbol])
    return df


if __name__ == "__main__":
    init(print)

当执行到xtdc.init(False)时,会一直卡在这里,也没有任何报错。配置信息用户名密码都没有问题IP列表端口等也没问题,因为通过相同的配置,通过vnpy纯python脚本调用迅投接口初始化是成功的。是哪里的问题呢?

回复

您需要登录后才可以回帖 登录 | 立即注册

主题

1

回帖

0

积分

0

客服专线

400-080-8112

用思考的速度交易,用真诚的态度合作,我们是认真的!
  • 关注公众号
  • 添加微信客服
Copyright © 2001-2026 迅投QMT社区 版权所有 All Rights Reserved. 京ICP备2025122616号-3
关灯 快速发帖
扫一扫添加微信客服
QQ客服返回顶部
快速回复 返回顶部 返回列表