返回列表 发布新帖

使用外部pyCharm 像使用http一样获取大mqt 数据

2482 1
发表于 2024-6-11 21:41:00 | 显示全部楼层 阅读模式

写在前面,需要用到redis, 需要有一定的解决问题能力,qmt 使用redis 会报一个错误,找到qmt对应代码注释即可

1、获取数据代码,此代码放在qmt里面点击运行即可

# encoding:gbk
import pandas as pd
from datetime import datetime, timedelta
import RedisUtils as redis
import json



class GlobalVariable():
    def __init__(self):
        self.is_pro = False  # 是否是实盘
        self.acc = ''
        self.strategy_name = '实盘' if self.is_pro else 'test'
        self.stream_id = '0'
        self.context = None


g = GlobalVariable()


def init(c):
    c.accID = ''
    g.is_pro = not c.do_back_test
    c.run_time("myHandlebar", "100nMilliSecond", "2019-10-14 13:20:00")
    print('======================================init=====end==============================================')


def handlebar(c):
    date = timetag_to_datetime(c.get_bar_timetag(c.barpos), '%Y%m%d')
    date_time = timetag_to_datetime(c.get_bar_timetag(c.barpos), '%Y%m%d%H%M%S')
    if g.is_pro:
        date_time = timetag_to_datetime(c.get_tick_timetag(), '%Y%m%d%H%M%S')
        date = timetag_to_datetime(c.get_tick_timetag(), '%Y%m%d')
    g.context = c
    # date_time = date + '093100'
    print(f'===================实盘环境【{g.is_pro}】========handlebar===【{date_time}】==start===================')


def handle_request(request_id, request_data):
    # 处理请求逻辑
    response_data = f"Processed: {request_data}"
    body = json.loads(request_data)
    print(f'===================body==========【{body}】==start===================')
    sock_list = body['sock_list']
    date_time = body['date_time']
    count = body['count']
    period = body['period']
    print(f'===================handle_request==========【{date_time}】==start===================')
    response_data = get_market_data_cache(g.context, sock_list, date_time, count, period)
    return response_data


# 创建一个流读取器


def myHandlebar(c):
    # 读取请求流中的消息
    REQUEST_STREAM = 'http_requests'
    # 定义请求和响应流名称
    RESPONSE_STREAM = 'http_responses'
    request = redisUtils.conn.xread({REQUEST_STREAM: g.stream_id}, block=0, count=1)
    if request:
        stream_name, messages = request[0]
        print(f'收到请求 {stream_name}: {messages}')
        message_id, message_data = messages[0]
        g.stream_id = message_id  # 更新stream_id以读取下一个消息
        # 获取响应数据
        request_id = message_data['request_id']
        request_body = message_data['body']
        # 处理请求并生成响应
        response_body = handle_request(request_id, request_body)
        # 将返回数据有字典里面包含df 转json。
        response_body = dict_to_df(response_body)
        # 将响应消息发送到响应流中
        redisUtils.conn.xadd(RESPONSE_STREAM, {'request_id': request_id, 'body': response_body.to_json()})
        redisUtils.conn.expire(request_id, '10')  # 设置键过期时间



def get_market_data_cache(c, stock_list, end_time, count, period):
    def process_stock_data(c, stock_code, end_time, count, period, trading_dates):
        stock_data = get_stock_kline_data(stock_code, 0, end_time, count, period)
        if len(stock_data) > 0:
            df_stock_data = pd.DataFrame(stock_data)
            stime_set = set(df_stock_data['stime'])
            for date in trading_dates:
                if date not in stime_set:
                    stock_data_qmt = c.get_market_data_ex([], [stock_code], period=period, end_time=date, count=1,
                                                          dividend_type='follow', fill_data=True, subscribe=True)
                    date_df = stock_data_qmt[stock_code]
                    save_stock_kline_data(stock_code, date_df, period)
                    df_stock_data = df_stock_data.append(date_df)
        else:
            stock_data_qmt = c.get_market_data_ex([], [stock_code], period=period, end_time=end_time, count=count,
                                                  dividend_type='follow', fill_data=True, subscribe=True)
            df_stock_data = stock_data_qmt[stock_code]
            save_stock_kline_data(stock_code, df_stock_data, period)
        df_stock_data.sort_index(axis=1, ascending=False, inplace=True)
        return df_stock_data  # 返回一个字典,键是股票代码,值是对应的 DataFrame

    result_stock_data = {}
    trading_dates = c.get_trading_dates('', '', end_time, count, period)
    for code in stock_list:
        df_stock_data = process_stock_data(c, code, end_time, count, period, trading_dates)
        result_stock_data[code] = df_stock_data
    return result_stock_data


def get_stock_kline_data(stock_code, start_time, end_time, count=None, period='1d'):
    # 从Redis中按时间范围获取股票K线数据
    redis_key = f"stock_kline_{period}:{stock_code}"
    if count:
        stock_data = redisUtils.conn.zrevrangebyscore(redis_key, end_time, '-inf', start=0, num=count, withscores=True)
    else:
        stock_data = redisUtils.conn.zrevrangebyscore(redis_key, end_time, '-inf', withscores=True)
    # 将数据转换为DataFrame
    return [(json.loads(data)) for data, timestamp in stock_data]


def save_stock_kline_data(stock_code, stock_df, period):
    def save_kline_data(redis_key, x):
        # 获取具有相同 score 的所有成员
        existing_members = redisUtils.conn.zrangebyscore(redis_key, x['stime'], x['stime'])
        # 如果存在多个相同 score 的成员
        if len(existing_members) > 1:
            # 删除多余的成员,只保留一个
            redisUtils.conn.zremrangebyrank(redis_key, 1, -2)
        # 存储股票K线数据到Redis的有序集合中
        redisUtils.conn.zadd(redis_key, {
            json.dumps(
                {'stime': x['stime'], 'open': x['open'], 'high': x['high'], 'low': x['low'], 'close': x['close'],
                 'volume': x['volume'], 'amount': x['amount'], 'preClose': x['preClose'], 'suspendFlag':
                     x['suspendFlag']}): x['stime']})

    # 存储股票K线数据到Redis的有序集合中
    redis_key = f"stock_kline_{period}:{stock_code}"
    stock_df.apply(lambda x: save_kline_data(redis_key, x), axis=1)

def dict_to_df(data_dict):
    data_df = pd.concat(data_dict.values(), keys=data_dict.keys())
    # 设置索引名称为 'code','time'
    data_df.index.names = ['code', 'index']
    # 重置索引
    data_df = data_df.reset_index()
    # 设置索引为 'time'
    data_df = data_df.set_index('stime')
    # 去掉index列
    data_df = data_df.drop(['index'], axis=1)
    return data_df

# 设置df 打印最大行数和列数
pd.set_option('display.max_columns', 1000)
pd.set_option('display.width', 1000)
# 获取日期
date_g = datetime.now().strftime('%Y%m%d')
redisUtils = redis.RedisUtils('localhost', '6379', db=1, decode_responses=True)

2.此代码在外第三方编辑器内使用

import RedisUtils as redis
import uuid
import time
import json
import pandas as pd

# 连接到本地的Redis服务器
redisUtils = redis.RedisUtils('localhost', '6379', db=1, decode_responses=True)
pd.set_option('display.max_columns', 1000)
pd.set_option('display.width', 1000)

# 定义请求和响应流名称
REQUEST_STREAM = 'http_requests'
RESPONSE_STREAM = 'http_responses'
TIMEOUT = 10  # 超时时间(秒)


def send_request(body):
    # 生成唯一的请求ID
    request_id = str(uuid.uuid4())

    # 将请求消息发送到请求流中
    redisUtils.conn.xadd(REQUEST_STREAM, {b'request_id': request_id, b'body': body.encode('utf-8')})
    # 创建一个流读取器,等待响应
    stream_id = '0'
    start_time = time.time()

    while True:
        # 检查是否超时
        elapsed_time = time.time() - start_time
        if elapsed_time > TIMEOUT:
            return f"Error: 请求超时 {TIMEOUT} 秒."

        # 读取响应流中的消息
        response = redisUtils.conn.xread({RESPONSE_STREAM: stream_id}, block=1000, count=1)  # 1秒的阻塞时间
        if response:
            stream_name, messages = response[0]
            message_id, message_data = messages[0]
            stream_id = message_id  # 更新stream_id以读取下一个消息
            # 获取响应数据
            response_request_id = message_data['request_id']
            response_body = message_data['body']

            # 检查响应是否与当前请求匹配
            if response_request_id == request_id:
                redisUtils.conn.xdel(RESPONSE_STREAM, message_id)
                return response_body


if __name__ == '__main__':
    jsonBody = '{"sock_list": ["513500.SH"], "date_time": "20240601", "count": 100, "period": "1d"}'
    # 创建json
    response_body = send_request(jsonBody)
    # 将json 转成df=
    df = pd.DataFrame(json.loads(response_body))
    print(f"返回数据: {df}")

3、Redis工具类

import redis


# RedisUtils 操作工具类
class RedisUtils:

    def __init__(self, host='1127.0.01', port='6379', db=0, decode_responses=True):
        """
        建立数据库连接
        :param db: 传入数据库名称
        :param decode_responses: 传入是否解码
        """
        # 建立数据库连接
        self.conn = redis.Redis(connection_pool=redis.ConnectionPool(
            host=host,
            port=port,
            db=db,
            decode_responses=decode_responses))

    def __del__(self):  # 对象资源被释放时触发,在对象即将被删除时的最后操作
        # 关闭数据库连接
        self.conn.close()

    '''
        list相关操作方法
    '''

    # 创建或者增加列表数据的操作 rpush, lpush
    def list_push(self, key, push_var='r', *value):
        # print(value)
        if push_var == 'r':
            self.conn.rpush(key, *value)
        elif push_var == 'l':
            self.conn.lpush(key, *value)

    # 删除列表数据的操作 lpop, rpop, lrem指定删除 count=0 代表删除全部
    #    count 也代表数量
    def list_pop(self, key, count, value, pop_var='r'):
        if pop_var == 'r':
            # 从右边删除
            self.conn.rpop(key)
        elif pop_var == 'l':
            # 从左边删除
            self.conn.lpop(key)
        elif pop_var == 'm':
            # 指定删除全部元素
            self.conn.lrem(key, count, value)
        elif pop_var == 'c':
            list2 = self.conn.lrange(key, 0, -1)
            # 遍历删除全部元素
            for value in list2:
                self.conn.lrem(key, count, value)

    # 修改所在索引的元素:lset lset key index value
    def list_set(self, key, index, value):
        self.conn.lset(key, index, value)

    # 查看列表元素所在的索引:lrange
    def list_get(self, key, start_index, end_index):
        print(self.conn.lrange(key, start_index, end_index))

评论1

木头
发表于 2024-6-13 11:51:18 | 显示全部楼层
这么麻烦还不如直接用通达信数据呢

回复

您需要登录后才可以回帖 登录 | 立即注册

客服专线

400-080-8112

用思考的速度交易,用真诚的态度合作,我们是认真的!
  • 关注公众号
  • 添加微信客服
Copyright © 2001-2025 迅投QMT社区 版权所有 All Rights Reserved. 京ICP备2025122616号-3
关灯 快速发帖
扫一扫添加微信客服
QQ客服返回顶部
快速回复 返回顶部 返回列表