R-Breaker策略由Richard Saidenberg在1994年开发,这种策略在《Futures Truth》的评选中连续十五年位列最优策略前十名。不同于其他策略,R-Breaker结合了趋势跟随和反转两种交易方式,可以在趋势行情中实现盈利,并且在市场反转时及时止盈并改变交易方向。因此,它提供了相对较多的交易机会,至今依然在全球范围内广泛使用和研究。
R-Breaker是一种日内回转交易策略,属于短线交易类型。日内回转交易即在同一天内买入或卖出标的,然后再将其卖出或买入。这种策略利用标的的短期波动来获取利润,是快进快出、投机性强的交易方式,适合短线投资者。重要的是,在交易日结束时所有仓位需要平清,不持有过夜仓位,所以并不适用于股票市场。
R-Breaker策略主要包括反转和趋势跟随两部分。当没有持仓时,它执行趋势跟随策略; 当有持仓时,等待反转信号以改变交易方向。
由于中国A股市场实行的是“T+1”交易制度,这里我们以期货市场为例来演示R-Breaker策略。
反转和趋势突破的价格点是根据前一交易日的收盘价、最高价和最低价计算得出的,它们从小到大依次是:突破买入价、观察卖出价、反转卖出价、反转买入价、观察买入价和突破卖出价。具体的计算公式如下:
[td]
指标计算方法 | 观察卖出价 = High + 0.35 * (Close – Low) | 观察买入价 = Low – 0.35* (High – Close) | 反转卖出价 = 1.07 / 2 (High + Low) – 0.07 Low | 反转买入价 = 1.07 / 2 (High + Low) – 0.07 High | 突破买入价 = 观察卖出价 + 0.25 * (观察卖出价 – 观察买入价) | 突破卖出价 = 观察买入价 – 0.25 * (观察卖出价 – 观察买入价) |
在R-Breaker策略中,High、Close、Low分别代表昨日最高价、昨日收盘价和昨日最低价。根据它们,我们可以计算出六个价格点: 突破买入价、观察卖出价、反转卖出价、反转买入价、观察买入价和突破卖出价。这些价格点从大到小排列。
由此可见,R-Breaker策略使用前一天的价格数据来绘制类似于网格的价格线,并且每天更新这些价格线。这些价格线在技术分析中相当于支撑位和阻力位,并且这两者的角色可能会互换。比如,当价格成功突破上方阻力位时,该阻力位可能会变为新的支撑位;同样,当价格成功突破下方支撑位时,该支撑位可能会变为新的阻力位。
在实际交易中,这些支撑位和阻力位对交易者指明了开平仓的方向以及买卖的具**置。交易者可以根据盘中价格、中心价、阻力位、支撑位进行灵活设置开平仓条件。同时,他们也可以依据这些网格价格线进行头寸管理,如增减仓位。
简单来说,R_break策略的核心观点就是昨日的趋势会延续到今日,并推动今日行情的上涨/下跌
当今日的价格突破前一交易日的最高点,形态上来看会是上涨趋势,具备一定的开多仓条件,但还不够。若前一交易日的下影线越长,说明多空方博弈激烈,多方力量强大。因此可以设置更高的突破买入价,一旦突破说明多方力量稳稳地占据了上风,那么就有理由相信未来会继续上涨。同理可解释突破卖出价背后的逻辑。
持有多仓时,若标的价格持续走高,则在当天收盘之前平仓获利离场。若价格不升反降,跌破观察卖出价时,此时价格仍处于前一交易日最高价之上,继续观望。若继续下跌,直到跌破反转卖出价时,平仓止损。
持有空仓时,若标的价格持续走低,则在当天收盘之前平仓获利离场。若价格不降反升,升至观察买入价时,此时价格仍处于前一交易日最低价之下,继续观望。若继续上涨,直到升至反转买入价时,平仓止损。
这种策略可以适应大趋势与震荡两种市场情况,并且具有明确的停损规则和退出机制。
以下是策略源码,仅做演示分享使用
- # coding:gbk
- """
- R_break策略示例
- """
- import pandas as pd
- import numpy as np
- import datetime
- class _a():
- pass
-
- A = _a()
- def init(C):
-
- A.symbol = "rb00.SF" # 品种
-
- C.accID = "test"
- # C.accID = account # 实盘用这个
- A.period = "5m" # 数据周期
- A.lots = 1 # 买卖手数
-
-
- def after_init(C):
- # 不确定有没有本地数据的话,下一次本地数据
- if 1:
- print("正在下载数据")
- download_history_data(A.symbol, A.period,"","")
- print("数据下载完成,程序继续")
-
-
- # 开始获取数据
- data = C.get_market_data_ex([], [A.symbol], period = "1d")
- close_df = get_df_ex(data, "close")
- open_df = get_df_ex(data, "open")
- low_df = get_df_ex(data, "low")
- high_df = get_df_ex(data,"high")
-
- pivot_df = (high_df + close_df + low_df) / 3 # 计算枢纽点
-
- A.bBreak = high_df + 2 * (pivot_df - low_df) # 突破买入价
- A.sSetup = pivot_df + (high_df - low_df) # 观察卖出价
-
- A.sEnter = 2 * pivot_df - low_df # 反转卖出价
- A.bEnter = 2 * pivot_df - high_df # 反转买入价
- A.bSetup = pivot_df - (high_df - low_df) # 观察买入价
- A.sBreak = low_df - 2 * (high_df - pivot_df) # 突破卖出价
-
- A.bBreak = A.bBreak.shift(1)
- A.sSetup = A.sSetup.shift(1)
- A.sEnter = A.sEnter.shift(1)
- A.bEnter = A.bEnter.shift(1)
- A.bSetup = A.bSetup.shift(1)
- A.sBreak = A.sBreak.shift(1)
-
-
- def handlebar(C):
- backTestTime = timetag_to_datetime(C.get_bar_timetag(C.barpos),"%Y%m%d%H%M%S")
-
- print(backTestTime) # 打印当前bar的时间,觉得眼花可以注释掉
-
- bBreak = A.bBreak.loc[backTestTime[:8],A.symbol]
- sBreak = A.sBreak.loc[backTestTime[:8],A.symbol]
- sSetup = A.sSetup.loc[backTestTime[:8],A.symbol]
- sEnter = A.sEnter.loc[backTestTime[:8],A.symbol]
- bSetup = A.bSetup.loc[backTestTime[:8],A.symbol]
- bEnter = A.bEnter.loc[backTestTime[:8],A.symbol]
-
- holdings = get_Future_holdings(C.accID,symbol = A.symbol) #获取账户持仓信息
-
- position_long = holdings.get("多头数量",0)
- position_short = holdings.get("空头数量",0)
-
- # 获取行情
- data = C.get_market_data_ex([],[A.symbol],period = A.period, end_time = backTestTime, count = 2)
- last_price = data[A.symbol].iloc[-1]["close"]
- high = data[A.symbol].iloc[-1]["high"]
- low = data[A.symbol].iloc[-1]["low"]
-
- # print(last_price,bBreak)
-
- if position_long == 0 and position_short == 0:
- if last_price > bBreak:
- # 在空仓的情况下,如果盘中价格超过突破买入价,则采取趋势策略,即在该点位开仓做多
- my_passorder(C,A.symbol,"buy_open",A.lots)# 做多
- print("空仓,盘中价格超过突破买入价: 开仓做多")
- elif last_price < sBreak:
- my_passorder(C,A.symbol,"sell_open",A.lots) # 做空
- print("空仓,盘中价格跌破突破卖出价: 开仓做空")
- else:
-
- # 反转的情况
- if position_long:
-
- if high > sSetup and last_price < sEnter:
- # 多头持仓,当日内最高价超过观察卖出价后,
- # 盘中价格出现回落,且进一步跌破反转卖出价构成的支撑线时,
- # 采取反转策略,即在该点位反手做空
- my_passorder(C,A.symbol, "sell_close", position_long) # 平多
- my_passorder(C,A.symbol,"sell_open",A.lots) # 做空
- print("盘中价格出现回落:反转做空")
- elif position_short:
- if low < bSetup and last_price > bEnter:
- # 空头持仓,当日内最低价低于观察买入价后,
- # 盘中价格出现反弹,且进一步超过反转买入价构成的阻力线时,
- # 采取反转策略,即在该点位反手做多
-
- my_passorder(C,A.symbol, "buy_close", position_short) # 平空
- my_passorder(C,A.symbol,"buy_open",A.lots)# 做多
- print("盘中价格出现反弹:反转做多")
-
- if backTestTime[-6:] == "144500":
- # 收盘前平掉所有仓位
- if position_long:
- my_passorder(C,A.symbol, "sell_close", position_long) # 平多
- if position_short:
- my_passorder(C,A.symbol, "buy_close", position_short) # 平空
- print("收盘平仓")
- return
- def get_df_ex(data:dict,field:str) -> pd.DataFrame:
- '''
- ToDo:用于在使用get_market_data_ex的情况下,取到标准df
-
- Args:
- data: get_market_data_ex返回的dict
- field: ['time', 'open', 'high', 'low', 'close', 'volume','amount', 'settelementPrice', 'openInterest', 'preClose', 'suspendFlag']
-
- Return:
- 一个以时间为index,标的为columns的df
-
- '''
- _index = data[list(data.keys())[0]].index.tolist()
- _columns = list(data.keys())
- df = pd.DataFrame.from_dict({col: data[col][field] for col in _columns})
- return df
- def get_Future_holdings(accid,symbol = None):
- '''
- 针对期货返回持仓的奇葩结构做处理
- Arg:
- accondid:账户id
- symbol: 品种,不填默认返会全部持仓
-
- return:
- {股票名:{'手数':int,"持仓成本":float,'浮动盈亏':float,"可用余额":int}}
- '''
- datatype = "FUTURE"
-
- PositionInfo_dict = {}
-
- Long_dict={}
-
- Short_dict={}
-
- resultlist = get_trade_detail_data(accid,datatype,'POSITION')
-
- for obj in resultlist:
- #防除零
- if obj.m_nVolume == 0:
- continue
- if obj.m_nDirection == 48:
- if not Long_dict.get(obj.m_strInstrumentID+"."+obj.m_strExchangeID):
- Long_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID] = {
- "多头数量":obj.m_nVolume,
- "多头成本":obj.m_dOpenPrice,
-
- "浮动盈亏":obj.m_dFloatProfit,
- "保证金占用":obj.m_dMargin
- }
- else:
-
- Long_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID]["多头数量"] += obj.m_nVolume
- # 算浮动盈亏
- Long_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID]["浮动盈亏"] += obj.m_dFloatProfit
- # 算保证金占用
- Long_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID]["保证金占用"] += obj.m_dMargin
- # 算多头成本
- Long_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID]["多头成本"] = (
- Long_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID]["多头成本"] * \
- (Long_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID]["多头数量"] - obj.m_nVolume) + \
- (obj.m_dOpenPrice * obj.m_nVolume)
- )/Long_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID]["多头数量"]
-
- elif obj.m_nDirection == 49:
- if not Short_dict.get(obj.m_strInstrumentID+"."+obj.m_strExchangeID):
- Short_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID] = {
- "空头数量":obj.m_nVolume ,
- "空头成本":obj.m_dOpenPrice ,
- "浮动盈亏":obj.m_dFloatProfit,
- "保证金占用":obj.m_dMargin
- }
- else:
- Short_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID]["空头数量"] += obj.m_nVolume
- # 算浮动盈亏
- Short_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID]["浮动盈亏"] += obj.m_dFloatProfit
- # 算保证金占用
- Short_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID]["保证金占用"] += obj.m_dMargin
- # 计算空头成本
- Short_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID]["空头成本"] = (
- Short_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID]["空头成本"] * \
- (Short_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID]["空头数量"] - obj.m_nVolume) + \
- (obj.m_dOpenPrice * obj.m_nVolume)
- )/Short_dict[obj.m_strInstrumentID+"."+obj.m_strExchangeID]["空头数量"]
-
-
- for _symbol in set(list(Long_dict.keys()) + list(Short_dict.keys())):
-
- PositionInfo_dict[_symbol] = {
- "多头数量":Long_dict[_symbol]["多头数量"] if Long_dict.get(_symbol) else 0 ,
-
- "空头数量":Short_dict[_symbol]["空头数量"] if Short_dict.get(_symbol) else 0 ,
-
- "多头成本":Long_dict[_symbol]["多头成本"] if Long_dict.get(_symbol) else None ,
-
- "空头成本":Short_dict[_symbol]["空头成本"] if Short_dict.get(_symbol) else None,
-
- "净持仓" : Long_dict.get(_symbol,{}).get("多头数量",0) - Short_dict.get(_symbol,{}).get("空头数量",0),
-
- "浮动盈亏": Long_dict.get(_symbol,{}).get("浮动盈亏",0) + Short_dict.get(_symbol,{}).get("浮动盈亏",0),
-
- "保证金占用": Long_dict.get(_symbol,{}).get("保证金占用",0) + Short_dict.get(_symbol,{}).get("保证金占用",0)
- }
-
- if symbol:
- return PositionInfo_dict.get(symbol,{})
- else :
- return PositionInfo_dict
- def my_passorder(C,Futuer:str,opentype:str,lots:int,price = None,m_strRemark = '系统备注'):
- '''
-
- Args:
- C: ContextInfo \n
- Futuer: 期货代码 \n
-
- opentype:
- 'buy_open' 开多\n
- 'sell_open' 开空\n
- 'sell_close' 平多\n
- 'buy_close' 平空\n
- lots: 手
- price: 下单价格,不指定时默认按市价下单
- m_strRemark = '系统备注' 用于自定义寻找orderID
- '''
- Futuer_ExchangeID = Futuer.split(".")[1]
- opentype = opentype #买卖方向
- op =1101 #手数
- # 期货区分开平
- if opentype == "buy_open":
- opType = 0
- elif opentype == "sell_open":
- opType = 3
- elif opentype == "sell_close":
- opType = 7
- elif opentype == "buy_close":
- opType = 9
- volumex = lots
- price = 0 if not price else price # price参数必须存在
- if Futuer_ExchangeID == "SF":
- prType = 14 if not price else 11 # 对于上期所,若不指定价格,则默认按对手价下单
- elif Futuer_ExchangeID == "DF" or Futuer_ExchangeID == "ZF":
- prType = 12 if not price else 11 # 对于大商所和郑商所,若不指定价格,则默认按涨跌停价下单
- else:
- prType = 14 if not price else 11 # 对于其他所,若不指定价格,则默认按对手价下单
- print(f'{Futuer} 新委托信息 方向{opentype} 价格{price} 量{volumex}')
- #print(f"opType:{opType} , op:{op} , C.accID{C.accID} , stock{stock} , prType{prType} , price{price} , volumex{volumex}")
- passorder(opType, op, C.accID,Futuer, prType, price, volumex,'交易注释',1,'{}'.format(m_strRemark), C)
- print(f'委托发送完成')
复制代码
|