#encoding:gbk
'''
ALPHALENS因子分析
'''
import pandas as pd
import numpy as np
import alphalens
from alphalens import utils
from alphalens import plotting
from alphalens import tears
from alphalens import performance
import math
import statsmodels.api as sm
import config
def init(ContextInfo):
ContextInfo.index = config.trade_index
ContextInfo.trade_code_list= ContextInfo.get_sector(ContextInfo.index)
ContextInfo.trade_code_list.remove('601728.SH')
#print(ContextInfo.trade_code_list)
ContextInfo.set_universe(ContextInfo.trade_code_list)
ContextInfo.tradedate = _get_trading_dates(ContextInfo, config.startDate , config.endDate )
#ContextInfo.get_trading_dates(config.tradedate_index,config.startDate , config.endDate , 1, '1d')
#print(ContextInfo.tradedate)
def handlebar(ContextInfo):
barpos = ContextInfo.barpos
#获取日期
realtime = ContextInfo.get_bar_timetag(barpos)
ContextInfo.curDate = timetag_to_datetime(realtime,'%Y%m%d')
#print(ContextInfo.curDate)
#if not ContextInfo.is_last_bar():
if ContextInfo.curDate == config.endDate:#回测结束日期
print('数据初始化...')
df_prices = _get_price( ContextInfo, ContextInfo.trade_code_list,config.startDate, config.endDate)
print('df_prices...', df_prices.head())
df_industry = get_groupby( ContextInfo.trade_code_list, config.startDate, config.endDate)
print('df_industry...', df_industry)
df_weights = get_weights(ContextInfo, ContextInfo.trade_code_list, config.startDate, config.endDate)
print('df_weights...', df_weights.head())
#dataapi = DataApi(ContextInfo, price = config.price, fq=config.fq, industry=config.industry,weight_method=config.weight_method)
#市值数据
df = _get_market_cap(ContextInfo, ContextInfo.trade_code_list, config.startDate, config.endDate, ln=False)
df_cap= pd.DataFrame(df.values.T, index=df.columns, columns=df.index.strftime('%Y-%m-%d'))
print('df_cap...', df_cap.head())
print('获取因子数据...')
factor_data = get_factor_data_format(ContextInfo, ContextInfo.trade_code_list, config.factor_list, config.startDate, config.endDate)
print(factor_data.head())
print('去极值n=5.2...')
factor_data = extreme_3sigma(factor_data, n=5.2)
print(factor_data.head())
print('标准化...')
factor_data = standardize(factor_data)
print(factor_data.head())
print('市值、行业中性化...')
factor_data = get_neutralization(ContextInfo, factor_data, df_cap, industry = True)
print(factor_data.head())
print('复合索引处理...')
#m_factor_data = get_multiIndex_Factors(ContextInfo, factor_data )
m_factor_data = pd.DataFrame()
for stock in ContextInfo.trade_code_list:
df = pd.DataFrame( data = factor_data[stock])
df.index.set_names(['date'], inplace=True)
df.columns=['factor_value']
df['asset'] =stock
df.reset_index(inplace = True)
df =df.set_index(['date','asset'])
m_factor_data = m_factor_data.append(df)
m_factor_data.sort_index(inplace=True)
print(m_factor_data.head())
print('因子进行分析...')
fac_data = alphalens.utils.get_clean_factor_and_forward_returns(m_factor_data,
df_prices,
groupby=df_industry,
binning_by_group=False,
quantiles=5,
bins=None,
periods=(1, 5, 10),
filter_zscore=20,
groupby_labels=None,
max_loss=0.35,
zero_aware=False,
cumulative_returns=True)
#print(fac_data.index.levels[0])
ic = alphalens.performance.factor_information_coefficient(fac_data)
print('IC...',ic)
'''
alphalens.tears.create_full_tear_sheet(fac_data,
long_short=True,
group_neutral=True,
by_group=True)
'''
def get_multiIndex_Factors(ContextInfo, factor_data ):
multiIndexFactors = pd.DataFrame()
for stock in ContextInfo.trade_code_list:
df = pd.DataFrame( data = factor_data[stock])
df.index.set_names(['date'], inplace=True)
df.columns=['factor']
df['asset'] =stock
df.reset_index(inplace = True)
df =df.set_index(['date','asset'])
multiIndexFactors = multiIndexFactors.append(df)
multiIndexFactors.sort_index(ascending=True, inplace=True)
return multiIndexFactors
def get_factor_data_format(ContextInfo, securities, factor_list, startDate, endDate):
#因子数据
#factor_list = ['Over_Bought_or_Sold.RSI']
tradedate = ContextInfo.tradedate
factor_data = pd.DataFrame(tradedate,columns=['date'])
factor_data = factor_data.set_index(['date'], drop= True)
factor_data.index = pd.to_datetime(factor_data.index).tz_localize("UTC").strftime('%Y%m%d')
for stock in securities:
df = ContextInfo.get_factor_data(factor_list, stock, startDate, endDate)
df.index = pd.to_datetime(df.index, unit='ms', utc=True).tz_convert('Asia/Shanghai').strftime('%Y%m%d')
factor_data[stock] = df
#print(factor_data)
factor_data.index = pd.to_datetime(factor_data.index)
factor_data = factor_data.fillna(method="bfill").fillna(method="ffill")
return factor_data
# 3sigma 去极值
def extreme_3sigma(dt,n=3):
mean = dt.mean() # 截面数据均值
std = dt.std() # 截面数据标准差
dt_up = mean + n*std # 上限
dt_down = mean - n*std # 下限
return dt.clip(dt_down, dt_up, axis=1) # 超出上下限的值,赋值为上下限
#标准化函数:
def standardize(s,ty=2):
'''
s为Series数据
ty为标准化类型:1 MinMax,2 Standard,3 maxabs
'''
data=s.dropna().copy()
if int(ty)==1:
re = (data - data.min())/(data.max() - data.min())
elif ty==2:
re = (data - data.mean())/data.std()
elif ty==3:
re = data/10**np.ceil(np.log10(data.abs().max()))
return re
#中性化函数
#传入:mkt_cap:以股票为index,市值为value的Series,
#factor:以股票code为index,因子值为value的Series,
#输出:中性化后的因子值series
def get_neutralization(ContextInfo, factor_data, df_cap, industry = False):
#数据中性化
df_factor = pd.DataFrame(factor_data.values.T, index=factor_data.columns, columns=factor_data.index.strftime('%Y-%m-%d'))
nedate=factor_data.index.strftime('%Y-%m-%d').tolist()
fac=pd.DataFrame()
for i in range(len(nedate)):
datea=nedate[i]
ts_factor = pd.Series(df_factor[datea].values, index=df_factor.index)
ts_cap = pd.Series(df_cap[datea].values, index=df_cap.index)#.values.reshape(-1,1)
factor=neutralization(ContextInfo, ts_factor , mkt_cap= ts_cap, industry =industry)
fac[datea] = factor
factor_data = pd.DataFrame(fac.values.T, index=fac.columns, columns=fac.index)
factor_data.index = pd.to_datetime(factor_data.index)
return factor_data
def neutralization(ContextInfo, factor,mkt_cap = False, industry = False):
df = pd.DataFrame([],index = factor.index)
y = factor
if type(mkt_cap) == pd.Series:
LnMktCap = mkt_cap.apply(lambda x:math.log(x))
if industry: #行业、市值
dummy_industry = get_industry_exposure(ContextInfo, df)
x = pd.concat([LnMktCap,dummy_industry],axis = 1)
else: #仅市值
x = LnMktCap
elif industry: #仅行业
dummy_industry = get_industry_exposure(ContextInfo,df)#API函数
x = dummy_industry
result = sm.OLS(y.astype(float),x.astype(float)).fit()
return result.resid
#为股票池添加行业标记,return df格式 ,为中性化函数的子函数
def get_industry_exposure(ContextInfo,df):
df = df.copy()
industry_index = config.sw_l1 #申万1级
for i in range(0,len(industry_index)):
#print(industry_index[i])
codes = ContextInfo.get_stock_list_in_sector(industry_index[i])
#print(codes)
if len(codes) == 0:
s = pd.Series([0]*len(df), index = df.index)
df[industry_index[i]] = s
#print(s)
else:
s = pd.Series([1]*len(codes), index = codes)
#print(s)
df[industry_index[i]] = s
return df.fillna(0)
def _get_price(ContextInfo, securities, start_date=None, end_date=None, count=None,
fields=None, dividend_type='none'):
#start_date = date2str(start_date) if start_date is not None else None
#end_date = date2str(end_date) if end_date is not None else None
df_price = ContextInfo.get_market_data(fields= [config.price], stock_code = securities, start_time =start_date, end_time = end_date,
period = '1d', dividend_type =config.dividend_type ).minor_xs(config.price)#.transpose(2, 1, 0).to_frame()
#print(df_price)
#df_price = pd.read_csv('df_price.csv', index_col= [0])
df_price.index = pd.to_datetime(df_price.index)#.tz_localize("UTC").strftime("%Y-%m-%d")
#df_price = df_price.fillna(value = 0)
df_price = df_price.fillna(method="bfill").fillna(method="ffill")
return df_price
def _get_industry(securities, start_date, end_date, industry='SW'):
#industries = self.api.get_industry(securities, date=date2str(end_date))
'''
ContextInfo.indu_dict = dict()
for security in ContextInfo.stocksList:
dictsw = {security:{'SW':get_industry_name_of_stock('SW',security)}}
dictscrc = {security:{'CSRC':get_industry_name_of_stock('CSRC',security)}}
dic = dict(dictsw, **dictscrc) # 合并两个字典,如果有相同关键字,以dict2的value填充
for key in dic.keys(): # 找回dict1中关键字对应的value
if key in dictsw:
dic[key] = dict(dic[key], **dictsw[key])
ContextInfo.indu_dict.update(dic)
print(ContextInfo.indu_dict)
industries = d_industry
return {s: d_industry.get(s).get(self.industry,'NA')
for s in securities}
'''
indu_dict = dict()
for security in securities:
dic = {security:get_industry_name_of_stock(industry,security)}
indu_dict.update(dic)
#print(indu_dict)
return indu_dict
def get_groupby( securities, start_date, end_date):
return _get_industry( securities=securities,
start_date=start_date, end_date=end_date,
industry=config.industry)
def _get_market_cap( ContextInfo, securities, start_date, end_date, ln=False):
fieldList = ['Valuation_and_Market_Cap.MktValue']
tradedate = ContextInfo.tradedate
market_cap = pd.DataFrame(tradedate, columns=['date'])
market_cap = market_cap.set_index(['date'], drop= True)
#print( start_date, end_date,fieldList, securities)
for stock in securities:
df = ContextInfo.get_factor_data(fieldList, stock, start_date, end_date)
#df.index = timetag_to_date_time(df.index, '%Y%m%d')
df.index = pd.to_datetime(df.index, unit='ms', utc=True).tz_convert('Asia/Shanghai').strftime('%Y%m%d')
df.columns = [stock]
market_cap[stock] = df
market_cap.index = pd.to_datetime(market_cap.index)#.tz_localize("UTC").strftime('%Y-%m-%d')
market_cap = market_cap.fillna(method="bfill").fillna(method="ffill")
if ln:
market_cap = np.log(market_cap)
return market_cap
def _get_circulating_market_cap(ContextInfo, securities, start_date, end_date,
ln=False):
fieldList = ['Valuation_and_Market_Cap.NegMktValue']
tradedate = ContextInfo.tradedate
market_cap = pd.DataFrame(tradedate)
market_cap = market_cap.set_index([0], drop= True)
for stock in securities:
df = ContextInfo.get_factor_data(fieldList, stock, start_date, end_date)
#df.index = timetag_to_date_time(df.index, '%Y%m%d')
df.index = pd.to_datetime(df.index, unit='ms', utc=True).tz_convert('Asia/Shanghai').strftime('%Y%m%d')
df.columns = [stock]
market_cap[stock] = df
market_cap.index = pd.to_datetime(market_cap.index)#.tz_localize("UTC").strftime('%Y-%m-%d')
market_cap = market_cap.fillna(method="bfill").fillna(method="ffill")
if ln:
cmarket_cap = np.log(cmarket_cap)
return cmarket_cap
def _get_average_weights(securities, start_date, end_date):
#print({sec: 1.0 for sec in securities})
return {sec: 1.0 for sec in securities}
def get_weights(ContextInfo,securities, start_date, end_date):
'''
ContextInfo.get_weight_in_index(indexcode, stockcode)
'''
#start_date = date2str(start_date)
#end_date = date2str(end_date)
if config.weight_method == 'avg':
weight_api = _get_average_weights(securities, start_date, end_date)
elif config.weight_method == 'mktcap':
weight_api = _get_market_cap( ContextInfo, securities, start_date, end_date, ln=False)
elif config.weight_method == 'ln_mktcap':
weight_api = _get_market_cap( ContextInfo, securities, start_date, end_date, ln=True)
elif config.weight_method == 'cmktcap':
weight_api = _get_circulating_market_cap( ContextInfo, securities, start_date, end_date, ln=False)
elif config.weight_method == 'ln_cmktcap':
weight_api = _get_circulating_market_cap( ContextInfo, securities, start_date, end_date, ln=True)
else:
raise ValueError('invalid weight_method')
return weight_api
def _get_trading_dates(ContextInfo, start_date , end_date ):
#start_date = date2str(start_date) if start_date is not None else None
#end_date = date2str(end_date) if end_date is not None else None
df_price = ContextInfo.get_market_data(fields= [config.price],
stock_code = [config.trade_index], start_time =start_date, end_time = end_date,
period = '1d', dividend_type =config.dividend_type )
#df_price.index = pd.to_datetime(df_price.index).tz_localize("UTC").strftime("%Y-%m-%d")
return df_price.index.values
|