今天在官网看到一个例子,聚宽策略迁移至QMT,我很久没有上官网了最近在学习大qmt,我一般用miniqmt,自己建立了一个系统,使用方便qmt和聚宽代码结构非常的相同,聚宽代码和zipline基本一样,使用简单刚刚运行了一个qmt行业轮动的策略,软件默认的大概的回测结构
我们登录聚宽看代码格式默认的策略,简单的均线策略
- # 导入函数库
- from jqdata import *
- # 初始化函数,设定基准等等
- def initialize(context):
- # 设定沪深300作为基准
- set_benchmark('000300.XSHG')
- # 开启动态复权模式(真实价格)
- set_option('use_real_price', True)
- # 输出内容到日志 log.info()
- log.info('初始函数开始运行且全局只运行一次')
- # 过滤掉order系列API产生的比error级别低的log
- # log.set_level('order', 'error')
- ### 股票相关设定 ###
- # 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
- set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
- ## 运行函数(reference_security为运行时间的参考标的;传入的标的只做种类区分,因此传入'000300.XSHG'或'510300.XSHG'是一样的)
- # 开盘前运行
- run_daily(before_market_open, time='before_open', reference_security='000300.XSHG')
- # 开盘时运行
- run_daily(market_open, time='open', reference_security='000300.XSHG')
- # 收盘后运行
- run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')
- ## 开盘前运行函数
- def before_market_open(context):
- # 输出运行时间
- log.info('函数运行时间(before_market_open):'+str(context.current_dt.time()))
- # 给微信发送消息(添加模拟交易,并绑定微信生效)
- # send_message('美好的一天~')
- # 要操作的股票:平安银行(g.为全局变量)
- g.security = '000001.XSHE'
- ## 开盘时运行函数
- def market_open(context):
- log.info('函数运行时间(market_open):'+str(context.current_dt.time()))
- security = g.security
- # 获取股票的收盘价
- close_data = get_bars(security, count=5, unit='1d', fields=['close'])
- # 取得过去五天的平均价格
- MA5 = close_data['close'].mean()
- # 取得上一时间点价格
- current_price = close_data['close'][-1]
- # 取得当前的现金
- cash = context.portfolio.available_cash
- # 如果上一时间点价格高出五天平均价1%, 则全仓买入
- if (current_price > 1.01*MA5) and (cash > 0):
- # 记录这次买入
- log.info("价格高于均价 1%%, 买入 %s" % (security))
- # 用所有 cash 买入股票
- order_value(security, cash)
- # 如果上一时间点价格低于五天平均价, 则空仓卖出
- elif current_price < MA5 and context.portfolio.positions[security].closeable_amount > 0:
- # 记录这次卖出
- log.info("价格低于均价, 卖出 %s" % (security))
- # 卖出所有股票,使这只股票的最终持有量为0
- order_target(security, 0)
- ## 收盘后运行函数
- def after_market_close(context):
- log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time())))
- #得到当天所有成交记录
- trades = get_trades()
- for _trade in trades.values():
- log.info('成交记录:'+str(_trade))
- log.info('一天结束')
- log.info('##############################################################')
复制代码
我直接利用跟单程序跟单简单,支持成交模式,持股模式
可以获取组合数据
成交模式跟单,支持同qmt
- def get_simultaneous_transaction_models(self):
- '''
- 交易模式2,同步成交,不会同步目前的持股
- '''
- trader_time_list=self.stock_data.get_trader_date_list()
- now_date=trader_time_list[-1]
- #now_date='2023-11-02'
- hold_stock=pd.read_excel(r'持股数据\持股数据.xlsx',dtype='object')
- account=pd.read_excel(r'账户数据\账户数据.xlsx')
- joinquant_trader=self.joinquant_data.get_backtrader_trader_log(date=now_date)
- joinquant_hold=self.joinquant_data.get_position(date=now_date)
- #账户可用数量
- if joinquant_trader.shape[0]>0:
- for stock,amount,transaction in zip(joinquant_trader['证券代码'],joinquant_trader['amount'],joinquant_trader['transaction']):
- try:
- hold=hold_stock[hold_stock['证券代码']==stock]
- if hold.shape[0]>0:
- av_amount=hold['可用余额'].tolist()[-1]
- else:
- av_amount=0
- #可用金额
- av_money=account['可用金额'].tolist()[-1]
- amount=amount*self.ratio
-
- data_type=self.trader.select_data_type(stock=stock)
- amount=self.trader.adjust_amount(stock=stock,amount=amount)
- if data_type in ['stock','fund'] and amount<100:
- amount=100
- elif data_type=='bond' and amount<10:
- amount=10
- else:
- amount=amount
- try:
- price=self.data.get_spot_data(stock=stock)['最新价']
- except:
- try:
- price=self.tdx_data.get_security_quotes_none(stock=stock)['price'].tolist()[-1]
- except:
- try:
- price=self.data.get_spot_trader_data(stock=stock)['价格'].tolist()[-1]
- except:
- price=self.data.get_hist_data_em(stock=stock)['close'].tolist()[-1]
- #买入的价值
- value=amount*price
- joinquant_hold_amount=joinquant_hold[joinquant_hold['证券代码']==stock]
- if joinquant_hold_amount.shape[0]>0:
- joinquant_amount=joinquant_hold_amount['amount'].tolist()[-1]
- else:
- joinquant_amount=0
- #账户持股数量
- hold=hold_stock[hold_stock['证券代码']==stock]
- if hold.shape[0]>0:
- hold_amount=hold['股票余额'].tolist()[-1]
- else:
- hold_amount=0
- if transaction=='卖':
- #如果账户可用持股大于聚宽下单,直接卖
- if joinquant_amount==hold_amount:
- print('聚宽跟单{}已经卖出'.format(stock))
- else:
- if av_amount>0:
- if av_amount>=amount:
- self.trader.sell(security=stock,amount=amount,price=price)
- text1='聚宽跟单卖出代码{} 价格{} 数量{}'.format(stock,price,amount)
- self.base_func.adjust_hold_data(stock=stock,trader_type='sell',price=price,amount=amount)
- self.base_func.adjust_account_cash(stock=stock,trader_type='sell',price=price,amount=amount)
- self.base_func.seed_trader_info(text=text1)
- #持股数量小于下单数据直接卖出全部可用卖出的
- else:
- amount=av_amount
- self.trader.sell(security=stock,amount=amount,price=price)
- text1='聚宽跟单卖出代码{} 价格{} 数量{}'.format(stock,price,amount)
- self.base_func.adjust_hold_data(stock=stock,trader_type='sell',price=price,amount=amount)
- self.base_func.adjust_account_cash(stock=stock,trader_type='sell',price=price,amount=amount)
- self.base_func.seed_trader_info(text=text1)
- else:
- print('聚宽跟单{}已经卖出'.format(stock))
- #买入的情况
- else:
- #
- if joinquant_amount==hold_amount:
- print('聚宽跟单{}已经买入'.format(stock))
- else:
- #可用资金大于下单价值
- if av_money>value:
- self.trader.buy(security=stock,amount=amount,price=price)
- text1='聚宽跟单买入代码{} 价格{} 数量{}'.format(stock,price,amount)
- print(text1)
- self.base_func.adjust_hold_data(stock=stock,trader_type='buy',price=price,amount=amount)
- self.base_func.adjust_account_cash(stock=stock,trader_type='buy',price=price,amount=amount)
- self.base_func.seed_trader_info(text=text1)
- else:
- text1='聚宽跟单买入代码失败可用资金{}不足'.format(av_money)
- except Exception as e:
- print('错误',e)
- else:
- print('{}没有成交'.format(now_date))
复制代码
我们继续看qmt的代码格式和聚合宽一个框架的格式
- #encoding:gbk
- '''
- 本策略事先设定好交易的股票篮子,然后根据指数的CCI指标来判断超买和超卖
- 当有超买和超卖发生时,交易事先设定好的股票篮子
- '''
- import pandas as pd
- import numpy as np
- import talib
- def init(ContextInfo):
- #hs300成分股中sh和sz市场各自流通市值最大的前3只股票
- ContextInfo.trade_code_list=['601398.SH','601857.SH','601288.SH','000333.SZ','002415.SZ','000002.SZ']
- ContextInfo.set_universe(ContextInfo.trade_code_list)
- ContextInfo.accID = '6000000058'
- ContextInfo.buy = True
- ContextInfo.sell = False
-
- def handlebar(ContextInfo):
- #计算当前主图的cci
- mkdict = ContextInfo.get_market_data(['high','low','close'],count=int(period)+1)
- highs = np.array(mkdict['high'])
- lows = np.array(mkdict['low'])
- closes = np.array(mkdict['close'])
- cci_list = talib.CCI(highs,lows,closes,timeperiod=int(period))
- now_cci = cci_list[-1]
- ContextInfo.paint("CCI",now_cci,-1,0,'noaxis')
-
- #交易策略
- if len(cci_list)<2:
- return
-
- #买入条件:指数CCI进入超卖区间时触发买入信号,过滤连续超卖导致的买入信号
- buy_condition = cci_list[-2]<buy_value<=now_cci and ContextInfo.buy
- #卖出条件:指数CCI进入超买区间时触发卖出信号,过滤连续超买导致的卖出信号
- sell_condition = cci_list[-2]>sell_value>=now_cci and ContextInfo.sell
-
- if buy_condition:
- ContextInfo.buy = False
- ContextInfo.sell = True
- #列表中股票分别下单买入10手
- for stockcode in ContextInfo.trade_code_list:
- order_lots(stockcode,10,ContextInfo,ContextInfo.accID)
- elif sell_condition:
- ContextInfo.buy = True
- ContextInfo.sell = False
- #列表中股票分别下单卖出10手
- for stockcode in ContextInfo.trade_code_list:
- order_lots(stockcode,-10,ContextInfo,ContextInfo.accID)
-
- #可买或可卖状态
- ContextInfo.draw_text(bool(buy_condition),float(now_cci),'buy') #绘制买点
- ContextInfo.draw_text(bool(sell_condition),float(now_cci),'sell') #绘制卖点
- ContextInfo.paint('can_buy',ContextInfo.buy,-1,0,'nodraw')
- ContextInfo.paint('can_sell',ContextInfo.sell,-1,0,'nodraw')
复制代码 简单的例子格式聚宽代码结构
- def initialize(context):
- # 定义一个全局变量, 保存要操作的股票,如000001平安银行
- g.security = '000001.XSHE'
- # 运行函数
- run_daily(market_open, time='every_bar')
- # 每个单位时间(如果按天回测,则每天调用一次,如果按分钟,则每分钟调用一次)调用一次
- def market_open(context):
- if g.security not in context.portfolio.positions:
- order(g.security, 1000)
- else:
- order(g.security, -800)
复制代码 qmt代码框架
- #示例说明:本策略,在回测的每个周期买入主图标的
- def init(C):
- #init handlebar函数的入参是ContextInfo对象 可以缩写为C
- #设置测试标的为主图品种
- C.stock= C.stockcode + '.' +C.market
- #accountid为测试的ID 回测模式资金账号可以填任意字符串
- C.accountid = "testS"
- def handlebar(C):
- # handlebar在回测时,会在回测周期的每根bar被执行一次
- # handlebar在实时行情下,会随着主图tick的更新被调用
- # passorder是QMT的综合下单函数,具体参数字段参考官方文档
- passorder(23, 1101,C.accountid, C.stock, 5, -1, 1, C)
复制代码 聚宽代码移植QMT示例聚宽代码
当价格高于5日均线平均价格1.05时买入,当价格低于5日平均价格0.95时卖出
- # 导入函数库
- import jqdata
- # 初始化函数,设定要操作的股票、基准等等
- def initialize(context):
- # 定义一个全局变量, 保存要操作的股票
- # 000001(股票:平安银行)
- g.security = '000001.XSHE'
- # 设定沪深300作为基准
- set_benchmark('000300.XSHG')
- # 开启动态复权模式(真实价格)
- set_option('use_real_price', True)
- # 每个单位时间(如果按天回测,则每天调用一次,如果按分钟,则每分钟调用一次)调用一次
- def handle_data(context, data):
- security = g.security
- # 获取股票的收盘价
- close_data = attribute_history(security, 5, '1d', ['close'])
- # 取得过去五天的平均价格
- MA5 = close_data['close'].mean()
- # 取得上一时间点价格
- current_price = close_data['close'][-1]
- # 取得当前的现金
- cash = context.portfolio.cash
- # 如果上一时间点价格高出五天平均价5%, 则全仓买入
- if current_price > 1.05*MA5:
- # 用所有 cash 买入股票
- order_value(security, cash)
- # 记录这次买入
- log.info("Buying %s" % (security))
- # 如果上一时间点价格低于五天平均价, 则空仓卖出
- elif current_price < 0.95*MA5 and context.portfolio.positions[security].closeable_amount > 0:
- # 卖出所有股票,使这只股票的最终持有量为0
- order_target(security, 0)
- # 记录这次卖出
- log.info("Selling %s" % (security))
- # 画出上一时间点价格
- record(stock_price=current_price)
复制代码 qmt代码迁移
- # QMT是一个本地端界面化软件,回测基准,回测手续费,回测起止时间都可在界面右侧栏进行设置
- # 初始化函数,设定要操作的股票,参数等
- def init(C):
- # 定义一个全局变量,设定要操作的股票
- # C.stock_list = C.get_stock_list_in_sector("沪深300") # 获取沪深300股票列表
- C.stock_list = ["000001.SZ"]
- # 设定回测初始资金
- C.capital = 1000000
- # 设定回测账号,实盘中账号在交易设置截面选择
- C.account_id = "testaccID"
- # # 关于回测时间,既可以在编辑器右侧栏设置,也可通过代码设置
- C.start = '2017-06-06 00:00:00'
- C.end = '2020-06-06 10:00:00'
- def handlebar(C):
- #当前k线日期
- bar_date = timetag_to_datetime(C.get_bar_timetag(C.barpos), '%Y%m%d%H%M%S')
- # 获取市场行情,具体参数释义见文档
- market_data = C.get_market_data_ex(["open", "high", "low", "close"],C.stock_list,period = "1d",end_time = bar_date)
- # 获取当前账户资金
- for i in get_trade_detail_data(C.account_id,"stock","account"):
- cash = i.m_dAvailable
- # 获取当前持仓信息,本示例中的holding_dict结构是{stock_code:lots}
- holding_dict = {obj.m_strInstrumentID+"."+obj.m_strExchangeID : obj.m_nVolume for obj in get_trade_detail_data(C.account_id,"stock","position")}
-
- # 遍历gmd返回的字典数据
- for i in market_data:
- # 获取K线数据
- kline = market_data[i]
- # 获取收盘价序列
- close_data = kline["close"]
- # 计算MA5
- MA5 = close_data.rolling(5).mean()
- # 如果上一时间点价格高出五天平均价5%, 且当前无持仓, 则全仓买入
- if close_data.iloc[-1] > 1.05 * MA5.iloc[-1] and i not in holding_dict.keys():
- # 全仓买入,交易记录会被客户端自动记录在回测结果,此处展示按金额交易的方法
- passorder(23, 1123, C.account_id, i, 5, -1, 1, C)
- print(f"{bar_date}——{i}触发买入")
- elif close_data.iloc[-1] < 0.95 * MA5.iloc[-1] and i in holding_dict.keys():
- # 获取当前持仓数量
- lots = holding_dict[i]
- # 全仓卖出,交易记录会被客户端自动记录在回测结果,此处展示按股数交易的方法
- passorder(24, 1101, C.account_id, i, 5, -1, lots, C)
- print(f"{bar_date}——{i}触发卖出")
复制代码 [color=rgba(0, 0, 0, 0.9)]聚宽跟单代码,支持qmt可以下载
|