写在前面,需要用到redis, 需要有一定的解决问题能力,qmt 使用redis 会报一个错误,找到qmt对应代码注释即可
1、获取数据代码,此代码放在qmt里面点击运行即可
# encoding:gbk
import pandas as pd
from datetime import datetime, timedelta
import RedisUtils as redis
import json
class GlobalVariable():
def __init__(self):
self.is_pro = False # 是否是实盘
self.acc = ''
self.strategy_name = '实盘' if self.is_pro else 'test'
self.stream_id = '0'
self.context = None
g = GlobalVariable()
def init(c):
c.accID = ''
g.is_pro = not c.do_back_test
c.run_time("myHandlebar", "100nMilliSecond", "2019-10-14 13:20:00")
print('======================================init=====end==============================================')
def handlebar(c):
date = timetag_to_datetime(c.get_bar_timetag(c.barpos), '%Y%m%d')
date_time = timetag_to_datetime(c.get_bar_timetag(c.barpos), '%Y%m%d%H%M%S')
if g.is_pro:
date_time = timetag_to_datetime(c.get_tick_timetag(), '%Y%m%d%H%M%S')
date = timetag_to_datetime(c.get_tick_timetag(), '%Y%m%d')
g.context = c
# date_time = date + '093100'
print(f'===================实盘环境【{g.is_pro}】========handlebar===【{date_time}】==start===================')
def handle_request(request_id, request_data):
# 处理请求逻辑
response_data = f"Processed: {request_data}"
body = json.loads(request_data)
print(f'===================body==========【{body}】==start===================')
sock_list = body['sock_list']
date_time = body['date_time']
count = body['count']
period = body['period']
print(f'===================handle_request==========【{date_time}】==start===================')
response_data = get_market_data_cache(g.context, sock_list, date_time, count, period)
return response_data
# 创建一个流读取器
def myHandlebar(c):
# 读取请求流中的消息
REQUEST_STREAM = 'http_requests'
# 定义请求和响应流名称
RESPONSE_STREAM = 'http_responses'
request = redisUtils.conn.xread({REQUEST_STREAM: g.stream_id}, block=0, count=1)
if request:
stream_name, messages = request[0]
print(f'收到请求 {stream_name}: {messages}')
message_id, message_data = messages[0]
g.stream_id = message_id # 更新stream_id以读取下一个消息
# 获取响应数据
request_id = message_data['request_id']
request_body = message_data['body']
# 处理请求并生成响应
response_body = handle_request(request_id, request_body)
# 将返回数据有字典里面包含df 转json。
response_body = dict_to_df(response_body)
# 将响应消息发送到响应流中
redisUtils.conn.xadd(RESPONSE_STREAM, {'request_id': request_id, 'body': response_body.to_json()})
redisUtils.conn.expire(request_id, '10') # 设置键过期时间
def get_market_data_cache(c, stock_list, end_time, count, period):
def process_stock_data(c, stock_code, end_time, count, period, trading_dates):
stock_data = get_stock_kline_data(stock_code, 0, end_time, count, period)
if len(stock_data) > 0:
df_stock_data = pd.DataFrame(stock_data)
stime_set = set(df_stock_data['stime'])
for date in trading_dates:
if date not in stime_set:
stock_data_qmt = c.get_market_data_ex([], [stock_code], period=period, end_time=date, count=1,
dividend_type='follow', fill_data=True, subscribe=True)
date_df = stock_data_qmt[stock_code]
save_stock_kline_data(stock_code, date_df, period)
df_stock_data = df_stock_data.append(date_df)
else:
stock_data_qmt = c.get_market_data_ex([], [stock_code], period=period, end_time=end_time, count=count,
dividend_type='follow', fill_data=True, subscribe=True)
df_stock_data = stock_data_qmt[stock_code]
save_stock_kline_data(stock_code, df_stock_data, period)
df_stock_data.sort_index(axis=1, ascending=False, inplace=True)
return df_stock_data # 返回一个字典,键是股票代码,值是对应的 DataFrame
result_stock_data = {}
trading_dates = c.get_trading_dates('', '', end_time, count, period)
for code in stock_list:
df_stock_data = process_stock_data(c, code, end_time, count, period, trading_dates)
result_stock_data[code] = df_stock_data
return result_stock_data
def get_stock_kline_data(stock_code, start_time, end_time, count=None, period='1d'):
# 从Redis中按时间范围获取股票K线数据
redis_key = f"stock_kline_{period}:{stock_code}"
if count:
stock_data = redisUtils.conn.zrevrangebyscore(redis_key, end_time, '-inf', start=0, num=count, withscores=True)
else:
stock_data = redisUtils.conn.zrevrangebyscore(redis_key, end_time, '-inf', withscores=True)
# 将数据转换为DataFrame
return [(json.loads(data)) for data, timestamp in stock_data]
def save_stock_kline_data(stock_code, stock_df, period):
def save_kline_data(redis_key, x):
# 获取具有相同 score 的所有成员
existing_members = redisUtils.conn.zrangebyscore(redis_key, x['stime'], x['stime'])
# 如果存在多个相同 score 的成员
if len(existing_members) > 1:
# 删除多余的成员,只保留一个
redisUtils.conn.zremrangebyrank(redis_key, 1, -2)
# 存储股票K线数据到Redis的有序集合中
redisUtils.conn.zadd(redis_key, {
json.dumps(
{'stime': x['stime'], 'open': x['open'], 'high': x['high'], 'low': x['low'], 'close': x['close'],
'volume': x['volume'], 'amount': x['amount'], 'preClose': x['preClose'], 'suspendFlag':
x['suspendFlag']}): x['stime']})
# 存储股票K线数据到Redis的有序集合中
redis_key = f"stock_kline_{period}:{stock_code}"
stock_df.apply(lambda x: save_kline_data(redis_key, x), axis=1)
def dict_to_df(data_dict):
data_df = pd.concat(data_dict.values(), keys=data_dict.keys())
# 设置索引名称为 'code','time'
data_df.index.names = ['code', 'index']
# 重置索引
data_df = data_df.reset_index()
# 设置索引为 'time'
data_df = data_df.set_index('stime')
# 去掉index列
data_df = data_df.drop(['index'], axis=1)
return data_df
# 设置df 打印最大行数和列数
pd.set_option('display.max_columns', 1000)
pd.set_option('display.width', 1000)
# 获取日期
date_g = datetime.now().strftime('%Y%m%d')
redisUtils = redis.RedisUtils('localhost', '6379', db=1, decode_responses=True)
2.此代码在外第三方编辑器内使用
import RedisUtils as redis
import uuid
import time
import json
import pandas as pd
# 连接到本地的Redis服务器
redisUtils = redis.RedisUtils('localhost', '6379', db=1, decode_responses=True)
pd.set_option('display.max_columns', 1000)
pd.set_option('display.width', 1000)
# 定义请求和响应流名称
REQUEST_STREAM = 'http_requests'
RESPONSE_STREAM = 'http_responses'
TIMEOUT = 10 # 超时时间(秒)
def send_request(body):
# 生成唯一的请求ID
request_id = str(uuid.uuid4())
# 将请求消息发送到请求流中
redisUtils.conn.xadd(REQUEST_STREAM, {b'request_id': request_id, b'body': body.encode('utf-8')})
# 创建一个流读取器,等待响应
stream_id = '0'
start_time = time.time()
while True:
# 检查是否超时
elapsed_time = time.time() - start_time
if elapsed_time > TIMEOUT:
return f"Error: 请求超时 {TIMEOUT} 秒."
# 读取响应流中的消息
response = redisUtils.conn.xread({RESPONSE_STREAM: stream_id}, block=1000, count=1) # 1秒的阻塞时间
if response:
stream_name, messages = response[0]
message_id, message_data = messages[0]
stream_id = message_id # 更新stream_id以读取下一个消息
# 获取响应数据
response_request_id = message_data['request_id']
response_body = message_data['body']
# 检查响应是否与当前请求匹配
if response_request_id == request_id:
redisUtils.conn.xdel(RESPONSE_STREAM, message_id)
return response_body
if __name__ == '__main__':
jsonBody = '{"sock_list": ["513500.SH"], "date_time": "20240601", "count": 100, "period": "1d"}'
# 创建json
response_body = send_request(jsonBody)
# 将json 转成df=
df = pd.DataFrame(json.loads(response_body))
print(f"返回数据: {df}")
3、Redis工具类
import redis
# RedisUtils 操作工具类
class RedisUtils:
def __init__(self, host='1127.0.01', port='6379', db=0, decode_responses=True):
"""
建立数据库连接
:param db: 传入数据库名称
:param decode_responses: 传入是否解码
"""
# 建立数据库连接
self.conn = redis.Redis(connection_pool=redis.ConnectionPool(
host=host,
port=port,
db=db,
decode_responses=decode_responses))
def __del__(self): # 对象资源被释放时触发,在对象即将被删除时的最后操作
# 关闭数据库连接
self.conn.close()
'''
list相关操作方法
'''
# 创建或者增加列表数据的操作 rpush, lpush
def list_push(self, key, push_var='r', *value):
# print(value)
if push_var == 'r':
self.conn.rpush(key, *value)
elif push_var == 'l':
self.conn.lpush(key, *value)
# 删除列表数据的操作 lpop, rpop, lrem指定删除 count=0 代表删除全部
# count 也代表数量
def list_pop(self, key, count, value, pop_var='r'):
if pop_var == 'r':
# 从右边删除
self.conn.rpop(key)
elif pop_var == 'l':
# 从左边删除
self.conn.lpop(key)
elif pop_var == 'm':
# 指定删除全部元素
self.conn.lrem(key, count, value)
elif pop_var == 'c':
list2 = self.conn.lrange(key, 0, -1)
# 遍历删除全部元素
for value in list2:
self.conn.lrem(key, count, value)
# 修改所在索引的元素:lset lset key index value
def list_set(self, key, index, value):
self.conn.lset(key, index, value)
# 查看列表元素所在的索引:lrange
def list_get(self, key, start_index, end_index):
print(self.conn.lrange(key, start_index, end_index))