返回列表 发布新帖

量化研究---年化30%小果全球大类低相关性动量趋势增强轮动策略回测

213 0
发表于 2025-4-17 11:27:11 | 显示全部楼层 阅读模式

最近看了很多动量模型,有利用涨跌幅,相对收益,斜率等来排序轮动

文章链接https://mp.weixin.qq.com/s/9NUoOVx8gMknrK-tQiCJKg

小果全球大类低相关性动量趋势增强轮动策略我研究的利用趋势模型加动量模型,趋势模型用来选择交易时间,动量模型用来排序股票趋势强度,选择全球大类低相关性的标的来轮动这个也是我实盘的模型思路,全球配置,趋势降低,降低相关性,波动率。

网页数据强大的仿真交易系统,我实盘的是全球大类低相关性策略

93e6a29062c756481c3d8cb1ce0c9968.png

点击低相关性策略

历史回测

4b6d82b1d083c00dd047619a4e56ebac.png

交易报告

7669273203fa55102115d0a26371b1b4.png

6040e4ff1938ed16d9f08a1a1fd4b8d8.png

持股数据

ee947aefb1272eae8057ea36e588063e.png

自定义股票池

6ad9914e380e67eb58d67112e1b5cab0.png

qmt的回测数据

824b1ee3e2376468965061d64ff13948.png

75ce4c496a1a133d38c62750144fd00a.png

回测的结果数据

7332e47956c94cbc466af3a436492cfb.png

33c3e6fa00fe5b8c09801dca763c4859.png

交易数据

ed6001cb44c7838b183c4f1d01cc50f5.png

ed6001cb44c7838b183c4f1d01cc50f5.png

当然我也给了实盘的代码

d16843521527e80d4a5d1ca76c71f56b.png

实盘源代码我放知识星球了下载利用使用

19f8c43086a623ad3fe2d37c1dbb40cf.jpg

e43fa88a8d83075b5b1df5e45ed71760.png

回测代码下载,直接回复20250416就可以,点赞分享就可以,我一个人编写研究的内容

不懂的问我就可以,加我备注入群可以加入量化研究群

186f5c21be6307d2a4b6d862958e9542.jpg

qmt回测源代码

#encoding:gbk
'''
小果全球大类低相关性动量趋势增强轮动策略回测
实盘知识星球下载
作者:小果量化
微信:xg_quant
时间:20250416
'''
import pandas as pd
import numpy as np
import talib
def init(c):
    #账户
    c.account=''
    #账户类型
    c.account_type='STOCK'
    #开始时间
    c.start='20241201 00:00:00'
    #结束时间
    c.end='20500101 00:00:00'
    #测试资金
    c.capital=300000
    c.stock_list=['513100.SH', '511130.SH', '159937.SZ', '513350.SH', 
        '512890.SH', '159915.SZ', '513500.SH', '159985.SZ', '159981.SZ',
        '159980.SZ', '513300.SH', '159680.SZ', '511090.SH', '513400.SH', '159934.SZ']
    c.name_list=['纳斯达克ETF', '30年债券ETF', '黄金ETF', '标普油气ETF',
        '红利ETF', '创业板ETF', '标普500ETF', '豆粕ETF', '能源化工ETF',
        '有色ETF', '沪深300ETF', '中证100ETF', '30年债券ETF', '道琼斯ETF', '黄金ETF']
    #时间窗口
    c.windows=3
    #最小的周期数量
    c.min_amount=9
    #当前的周期
    c.last_amount=3
    #5日均线
    c.mean_line=5
    #最低分
    c.min_score=25
    #买入排名
    c.rank=3
    #交易百分比,可以采用动态百分比安持股数量分配
    c.buy_ratio=0.33
    c.sell_ratio=0
    c.period_1='60m'
    c.df=pd.DataFrame()
    c.df['证券代码']=c.stock_list
    c.df['名称']=c.name_list
    print(c.df)
    #动量因子天数
    #老版本的回测需要设定股票池,配合历史数据使用
    c.set_universe(c.stock_list)
def handlebar(c):
    #当前K线索引号
    d=c.barpos
    df=c.df
    #获取当前K线日期
    #精确到分钟小周期,不然有未来函数
    today_1=timetag_to_datetime(c.get_bar_timetag(d),'%Y%m%d%H%M%S')
    #日线
    #today_1=timetag_to_datetime(c.get_bar_timetag(d),'%Y-%m-%d)
    print(today_1,"当前时间*****")
    #必须使用前复权
    #hist=c.get_history_data(100,'1d',['open','close','low','high'],1)
    #持股
    hold_stock=get_position(c,c.account,c.account_type)
    account=get_account(c,c.account,c.account_type)
    if hold_stock.shape[0]>0:
        hold_stock=hold_stock[hold_stock['持仓量']>=10]
        if hold_stock.shape[0]>0:
            hold_stock_list=hold_stock['证券代码'].tolist()
            hold_amount=hold_stock.shape[0]
        else:
            hold_stock_list=[]
            hold_amount=0
    else:
        hold_stock_list=[]
        hold_amount=0
    df=c.df
    if df.shape[0]>0:
        total_amount_list=[]
        last_amount_list=[]
        score_list=[]
        up_line_list=[]
        for stock in df['证券代码'].tolist():
            try:
                hist=c.get_market_data_ex(
                fields=[], 
                stock_code=[stock], 
                period=c.period_1, 
                start_time=str(c.start)[:8], 
                end_time=today_1, 
                count=-1, 
                fill_data=True, 
                subscribe=True)
                hist=hist[stock]
                df1=six_pulse_excalibur_hist(c,hist)
                df1['均线']=df1['close'].rolling(c.mean_line).mean()
                df1['站上均线']=df1['close']>df1['均线']
                score=mean_line_models(c,close_list=df1['close'].tolist())
                df1=df1[-c.windows:]
                up=df1['站上均线'].tolist()[-1]
                total_amount=sum(df1['signal'].tolist())
                last_amount=df1['signal'].tolist()[-1]
                total_amount_list.append(total_amount)
                last_amount_list.append(last_amount)
                score_list.append(score)
                up_line_list.append(up)
            except Exception as e:
                print(e,stock,'有问题')
                total_amount=None
                last_amount=None
                score=None
                total_amount_list.append(total_amount)
                last_amount_list.append(last_amount)
                score_list.append(score)
                up_line_list.append(None)


        df['数量']=total_amount_list
        df['最新数量']=last_amount_list
        df['分数']=score_list
        df['站上均线']=up_line_list
        #选择数据
        df=df[df['数量']>=c.min_amount]
        df=df[df['最新数量']>=c.last_amount]
        df=df[df['分数']>=c.min_score]
        df=df[df['站上均线']==True]
        #安总数量排序
        df=df.sort_values(by='数量', ascending=False)
        print('分析数据***********************')
        print(df)
        rank_stock_list=df['证券代码'][:c.rank]
        buy_list=[]
        sell_list=[]
        #买入
        for stock in rank_stock_list:
            if stock in hold_stock_list:
                print(stock,'买入分析在持有排名股票池继续持有')
            else:
                print(stock,'买入分析持有股票池排名没有在持股买入')
                buy_list.append(stock)
        #卖出
        for stock in hold_stock_list:
            if stock in rank_stock_list:
                print(stock,'卖出分析在持有排名股票池继续持有')
            else:
                print(stock,'卖出分析不在持有排名股票池卖出')
                sell_list.append(stock)
        for stock in sell_list:
            order_target_percent(stock, c.sell_ratio, c, c.account)
            print('{} 卖出股票{} '.format(today_1,stock))
        for stock in buy_list:
            order_target_percent(stock, c.buy_ratio, c, c.account)
            print('{} 买入股票{} '.format(today_1,stock))
def mean_line_models(c,close_list=[],x1=3,x2=5,x3=10,x4=15,x5=20):
    '''
    均线模型
    趋势模型
    5,10,20,30,60
    '''
    df=pd.DataFrame()
    df['close']=close_list
    #df=self.bond_cov_data.get_cov_bond_hist_data(stock=stock,start=start_date,end=end_date,limit=1000000000)
    df1=pd.DataFrame()
    df1['x1']=df['close'].rolling(window=x1).mean()
    df1['x2']=df['close'].rolling(window=x2).mean()
    df1['x3']=df['close'].rolling(window=x3).mean()
    df1['x4']=df['close'].rolling(window=x4).mean()
    df1['x5']=df['close'].rolling(window=x5).mean()
    score=0
    #加分的情况
    mean_x1=df1['x1'].tolist()[-1]
    mean_x2=df1['x2'].tolist()[-1]
    mean_x3=df1['x3'].tolist()[-1]
    mean_x4=df1['x4'].tolist()[-1]
    mean_x5=df1['x5'].tolist()[-1]
    #相邻2个均线进行比较
    if mean_x1>=mean_x2:
        score+=25
    if mean_x2>=mean_x3:
        score+=25
    if mean_x3>=mean_x4:
        score+=25
    if mean_x4>=mean_x5:
        score+=25
    return score
def six_pulse_excalibur_hist(c,df):


    markers=0
    signal=0
    #df=self.data.get_hist_data_em(stock=stock)
    CLOSE=df['close']
    LOW=df['low']
    HIGH=df['high']
    DIFF=EMA(CLOSE,8)-EMA(CLOSE,13)
    DEA=EMA(DIFF,5)
    #如果满足DIFF>DEA 在1的位置标记1的图标
    #DRAWICON(DIFF>DEA,1,1);
    markers+=IF(DIFF>DEA,1,0)
    #如果满足DIFF<DEA 在1的位置标记2的图标
    #DRAWICON(DIFF<DEA,1,2);
    markers+=IF(DIFF<DEA,1,0)
    #DRAWTEXT(ISLASTBAR=1,1,'. MACD'),COLORFFFFFF;{微信公众号:尊重市场}
    ABC1=DIFF>DEA
    signal+=IF(ABC1,1,0)
    尊重市场1=(CLOSE-LLV(LOW,8))/(HHV(HIGH,8)-LLV(LOW,8))*100
    K=SMA(尊重市场1,3,1)
    D=SMA(K,3,1)
    #如果满足k>d 在2的位置标记1的图标
    markers+=IF(K>D,1,0)
    #DRAWICON(K>D,2,1);
    markers+=IF(K<D,1,0)
    #DRAWICON(K<D,2,2);
    #DRAWTEXT(ISLASTBAR=1,2,'. KDJ'),COLORFFFFFF;
    ABC2=K>D
    signal+=IF(ABC2,1,0)
    指标营地=REF(CLOSE,1)
    RSI1=(SMA(MAX(CLOSE-指标营地,0),5,1))/(SMA(ABS(CLOSE-指标营地),5,1))*100
    RSI2=(SMA(MAX(CLOSE-指标营地,0),13,1))/(SMA(ABS(CLOSE-指标营地),13,1))*100
    markers+=IF(RSI1>RSI2,1,0)
    #DRAWICON(RSI1>RSI2,3,1);
    markers+=IF(RSI1<RSI2,1,0)
    #DRAWICON(RSI1<RSI2,3,2);
    #DRAWTEXT(ISLASTBAR=1,3,'. RSI'),COLORFFFFFF;
    ABC3=RSI1>RSI2
    signal+=IF(ABC3,1,0)
    尊重市场=-(HHV(HIGH,13)-CLOSE)/(HHV(HIGH,13)-LLV(LOW,13))*100
    LWR1=SMA(尊重市场,3,1)
    LWR2=SMA(LWR1,3,1)
    #DRAWICON(LWR1>LWR2,4,1);
    markers+=IF(LWR1>LWR2,1,0)
    #DRAWICON(LWR1<LWR2,4,2);
    markers+=IF(LWR1<LWR2,1,0)
    #DRAWTEXT(ISLASTBAR=1,4,'. LWR'),COLORFFFFFF;
    ABC4=LWR1>LWR2
    signal+=IF(ABC4,1,0)
    BBI=(MA(CLOSE,3)+MA(CLOSE,5)+MA(CLOSE,8)+MA(CLOSE,13))/4
    #DRAWICON(CLOSE>BBI,5,1);
    markers+=IF(CLOSE>BBI,1,0)
    #DRAWICON(CLOSE<BBI,5,2);
    markers+=IF(CLOSE<BBI,1,0)
    #DRAWTEXT(ISLASTBAR=1,5,'. BBI'),COLORFFFFFF;
    ABC10=7
    ABC5=CLOSE>BBI
    signal+=IF(ABC5,1,0)
    MTM=CLOSE-REF(CLOSE,1)
    MMS=100*EMA(EMA(MTM,5),3)/EMA(EMA(ABS(MTM),5),3)
    MMM=100*EMA(EMA(MTM,13),8)/EMA(EMA(ABS(MTM),13),8)
    markers+=IF(MMS>MMM,1,0)
    #DRAWICON(MMS>MMM,6,1);
    markers+=IF(MMS<MMM,1,0)
    #DRAWICON(MMS<MMM,6,2);
    #DRAWTEXT(ISLASTBAR=1,6,'. ZLMM'),COLORFFFFFF;
    ABC6=MMS>MMM
    signal+=IF(ABC6,1,0)
    df['signal']=signal
    df['markers']=markers
    return df
def get_account(c,accountid,datatype):
    '''
    获取账户数据
    '''
    accounts = get_trade_detail_data(accountid, datatype, 'account')
    result={}
    for dt in accounts:
        result['总资产']=dt.m_dBalance
        result['净资产']=dt.m_dAssureAsset
        result['总市值']=dt.m_dInstrumentValue
        result['总负债']=dt.m_dTotalDebit
        result['可用金额']=dt.m_dAvailable
        result['盈亏']=dt.m_dPositionProfit
    return result
#获取持仓信息{code.market:手数}
def get_position(c,accountid,datatype):
    '''
    获取持股数据
    '''
    positions = get_trade_detail_data(accountid,datatype, 'position')
    data=pd.DataFrame()
    if len(positions)>0:
        df=pd.DataFrame()
        for dt in positions:
            df['股票代码']=[dt.m_strInstrumentID]
            df['市场类型']=[dt.m_strExchangeID]
            df['证券代码']=df['股票代码']+'.'+df['市场类型']
            df['证券名称']=[dt.m_strInstrumentName]
            df['持仓量']=[dt.m_nVolume]
            df['可用数量']=[dt.m_nCanUseVolume]
            df['成本价']=[dt.m_dOpenPrice]
            df['市值']=[dt.m_dInstrumentValue]
            df['持仓成本']=[dt.m_dPositionCost]
            df['盈亏']=[dt.m_dPositionProfit]
            data=pd.concat([data,df],ignore_index=True)
    else:
        data=pd.DataFrame()
    return data 
def RD(N,D=3):   
    #四舍五入取3位小数 
    return np.round(N,D)        
def RET(S,N=1):  
    #返回序列倒数第N个值,默认返回最后一个
    return np.array(S)[-N]      
def ABS(S
):  
    #返回N的绝对值    
    return np.abs(S)
def MAX(S1,S2): 
    #序列max 
    return np.maximum(S1,S2)    
def MIN(S1,S2):  
     #序列min
    return np.minimum(S1,S2)   
def IF(S,A,B): 
    #序列布尔判断 return=A  if S==True  else  B  
    return np.where(S,A,B)      
def AND(S1,S2):
    #and
    return np.logical_and(S1,S2)
def OR(S1,S2):
    #or
    return np.logical_or(S1,S2)
def RANGE(A,B,C):
    '''
    期间函数
    B<=A<=C
    '''
    df=pd.DataFrame()
    df['select']=A.tolist()
    df['select']=df['select'].apply(lambda x: True if (x>=B and x<=C) else False)
    return df['select']


def REF(S, N=1):          #对序列整体下移动N,返回序列(shift后会产生NAN)    
    return pd.Series(S).shift(N).values  


def DIFF(S, N=1):         #前一个值减后一个值,前面会产生nan 
    return pd.Series(S).diff(N).values     #np.diff(S)直接删除nan,会少一行


def STD(S,N):             #求序列的N日标准差,返回序列    
    return  pd.Series(S).rolling(N).std(ddof=0).values     


def SUM(S, N):            #对序列求N天累计和,返回序列    N=0对序列所有依次求和         
    return pd.Series(S).rolling(N).sum().values if N>0 else pd.Series(S).cumsum().values  


def CONST(S):             #返回序列S最后的值组成常量序列
    return np.full(len(S),S[-1]) 
def HHV(S,N):             #HHV(C, 5) 最近5天收盘最高价        
    return pd.Series(S).rolling(N).max().values     


def LLV(S,N):             #LLV(C, 5) 最近5天收盘最低价     
    return pd.Series(S).rolling(N).min().values    


def HHVBARS(S,N):         #求N周期内S最高值到当前周期数, 返回序列
    return pd.Series(S).rolling(N).apply(lambda x: np.argmax(x[::-1]),raw=True).values 


def LLVBARS(S,N):         #求N周期内S最低值到当前周期数, 返回序列
    return pd.Series(S).rolling(N).apply(lambda x: np.argmin(x[::-1]),raw=True).values    


def MA(S,N):              #求序列的N日简单移动平均值,返回序列                    
    return pd.Series(S).rolling(N).mean().values  


def EMA(S,N):             #指数移动平均,为了精度 S>4*N  EMA至少需要120周期     alpha=2/(span+1)    
    return pd.Series(S).ewm(span=N, adjust=False).mean().values     


def SMA(S, N, M=1):       #中国式的SMA,至少需要120周期才精确 (雪球180周期)    alpha=1/(1+com)    
    return pd.Series(S).ewm(alpha=M/N,adjust=False).mean().values           #com=N-M/M


def DMA(S, A):            #求S的动态移动平均,A作平滑因子,必须 0<A<1  (此为核心函数,非指标)
    return pd.Series(S).ewm(alpha=A, adjust=True).mean().values


def WMA(S, N):            #通达信S序列的N日加权移动平均 Yn = (1*X1+2*X2+3*X3+...+n*Xn)/(1+2+3+...+Xn)
    return pd.Series(S).rolling(N).apply(lambda x:x[::-1].cumsum().sum()*2/N/(N+1),raw=True).values 


def AVEDEV(S, N):         #平均绝对偏差  (序列与其平均值的绝对差的平均值)   
    return pd.Series(S).rolling(N).apply(lambda x: (np.abs(x - x.mean())).mean()).values 


def SLOPE(S, N):          #返S序列N周期回线性回归斜率            
    return pd.Series(S).rolling(N).apply(lambda x: np.polyfit(range(N),x,deg=1)[0],raw=True).values


def FORCAST(S, N):        #返回S序列N周期回线性回归后的预测值, jqz1226改进成序列出    
    return pd.Series(S).rolling(N).apply(lambda x:np.polyval(np.polyfit(range(N),x,deg=1),N-1),raw=True).values  


def LAST(S, A, B):        #从前A日到前B日一直满足S_BOOL条件, 要求A>B & A>0 & B>=0 
    return np.array(pd.Series(S).rolling(A+1).apply(lambda x:np.all(x[::-1][B:]),raw=True),dtype=bool) 
    #------------------   1级:应用层函数(通过0级核心函数实现) ----------------------------------
def COUNT(S, N):                       # COUNT(CLOSE>O, N):  最近N天满足S_BOO的天数  True的天数
    return SUM(S,N)    


def EVERY(S, N):                       # EVERY(CLOSE>O, 5)   最近N天是否都是True
    return  IF(SUM(S,N)==N,True,False)                     
def EXIST(S, N):                       # EXIST(CLOSE>3010, N=5)  n日内是否存在一天大于3000点  
    return IF(SUM(S,N)>0,True,False)


def FILTER(S, N):                      # FILTER函数,S满足条件后,将其后N周期内的数据置为0, FILTER(C==H,5)
    for i in range(len(S)): S[i+1:i+1+N]=0  if S[i] else S[i+1:i+1+N]        
    return S                           # 例:FILTER(C==H,5) 涨停后,后5天不再发出信号 


def BARSLAST(S):                       #上一次条件成立到当前的周期, BARSLAST(C/REF(C,1)>=1.1) 上一次涨停到今天的天数 
    M=np.concatenate(([0],np.where(S,1,0)))  
    for i in range(1, len(M)):  M[i]=0 if M[i] else M[i-1]+1    
    return M[1:]                       


def BARSLASTCOUNT(S):                  # 统计连续满足S条件的周期数        by jqz1226
    rt = np.zeros(len(S)+1)            # BARSLASTCOUNT(CLOSE>OPEN)表示统计连续收阳的周期数
    for i in range(len(S)): rt[i+1]=rt[i]+1  if S[i] else rt[i+1]
    return rt[1:]  


def BARSSINCEN(S, N):                  # N周期内第一次S条件成立到现在的周期数,N为常量  by jqz1226
    return pd.Series(S).rolling(N).apply(lambda x:N-1-np.argmax(x) if np.argmax(x) or x[0] else 0,raw=True).fillna(0).values.astype(int)




def CROSS(S1, S2):                     #判断向上金叉穿越 CROSS(MA(C,5),MA(C,10))  判断向下死叉穿越 CROSS(MA(C,10),MA(C,5))   
    return np.concatenate(([False], np.logical_not((S1>S2)[:-1]) & (S1>S2)[1:]))    # 不使用0级函数,移植方便  by jqz1226
def CROSS_UP(S1, S2):                     #判断向上金叉穿越 CROSS(MA(C,5),MA(C,10))  判断向下死叉穿越 CROSS(MA(C,10),MA(C,5))   
    return np.concatenate(([False], np.logical_not((S1>S2)[:-1]) & (S1>S2)[1:]))    # 不使用0级函数,移植方便  by jqz1226
def CROSS_DOWN(S1, S2):                     
    return np.concatenate(([False], np.logical_not((S1<S2)[:-1]) & (S1<S2)[1:]))    # 不使用0级函数,移植方便  by jqz1226


def LONGCROSS(S1,S2,N):                #两条线维持一定周期后交叉,S1在N周期内都小于S2,本周期从S1下方向上穿过S2时返回1,否则返回0         
    return  np.array(np.logical_and(LAST(S1<S2,N,1),(S1>S2)),dtype=bool)            # N=1时等同于CROSS(S1, S2)


def VALUEWHEN(S, X):                   #当S条件成立时,取X的当前值,否则取VALUEWHEN的上个成立时的X值   by jqz1226
    return pd.Series(np.where(S,X,np.nan)).ffill().values 

[groupid=42]量化交易研究[/groupid]

回复

您需要登录后才可以回帖 登录 | 立即注册

客服专线

400-080-8112

用思考的速度交易,用真诚的态度合作,我们是认真的!
  • 关注公众号
  • 添加微信客服
Copyright © 2001-2025 迅投QMT社区 版权所有 All Rights Reserved. 蜀ICP备19002686号-2
关灯 快速发帖
扫一扫添加微信客服
QQ客服返回顶部
快速回复 返回顶部 返回列表