制一个全新的策略程序代码,直接修改完整写出来,直接能在国金QMT软件导入策略在实盘使用,新策略的逻辑如下: 从通达信板块同步板块指数成分股,路径D:\国金证券\T0002\blocknew\BKHTD.blk 选股逻辑: 1、选出同步过来所有板块的成分股 ,从成分股选出满足条件大于均线20的个股,并且返回(最好自动同步)到通达信自建的候选池目录
encoding:gbk
version = '1.0.0'
author = '量化策略'
date = '2025-06-07'
description = '板块成分股20日均线选股策略'
warning = '策略代码仅供研究使用,请勿外传'
import os
import time
import numpy as np
import MyTT as mt
from datetime import datetime
常量定义
TDX_BLOCK_PATH = r"D:\国金证券\T0002\blocknew\BKHTD.blk" # 通达信板块文件路径
CANDIDATE_POOL = "候选池" # 自建候选池名称
LOG_PATH = r"C:\QMT_LOG" # 日志路径
class BlockSelector:
def init(self, context):
self.context = context
self.logger = self.init_logger()
self.last_sync_time = 0 # 上次同步时间戳
def init_logger(self):
"""初始化日志系统"""
if not os.path.exists(LOG_PATH):
os.makedirs(LOG_PATH)
log_file = os.path.join(LOG_PATH, f"block_selector_{datetime.today().strftime('%Y%m%d')}.log")
return open(log_file, 'a', encoding='gbk')
def log(self, message):
"""记录日志"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_msg = f"{timestamp} - {message}\n"
self.logger.write(log_msg)
self.logger.flush()
print(log_msg.strip()) # 控制台输出
def read_block_file(self):
"""读取通达信板块文件"""
if not os.path.exists(TDX_BLOCK_PATH):
self.log(f"错误:板块文件不存在 {TDX_BLOCK_PATH}")
return []
try:
with open(TDX_BLOCK_PATH, 'r', encoding='gbk') as f:
stock_codes = [line.strip() for line in f if line.strip()]
return list(set(stock_codes)) # 去重
except Exception as e:
self.log(f"读取板块文件失败: {str(e)}")
return []
def convert_to_qmt_format(self, tdx_code):
"""将通达信代码转换为QMT格式"""
if tdx_code.startswith('6'):
return f"{tdx_code}.SH"
elif tdx_code.startswith(('0', '3')):
return f"{tdx_code}.SZ"
elif tdx_code.startswith('4'):
return f"{tdx_code}.BJ"
else:
self.log(f"未知股票代码格式: {tdx_code}")
return None
def calculate_ma20(self, stock_code):
"""计算20日均线"""
try:
# 获取最近21个交易日的收盘价
data = self.context.get_market_data_ex(
fields=['close'],
stock_code=[stock_code],
period='1d',
count=21,
dividend_type='front'
).get(stock_code, None)
if data is None or len(data) < 20:
self.log(f"{stock_code} 数据不足,无法计算20日均线")
return None, None
closes = np.array(data['close'])
ma20 = mt.MA(closes, 20)[-1] # 最新20日均线值
last_close = closes[-1] # 最新收盘价
return last_close, ma20
except Exception as e:
self.log(f"计算20日均线出错 {stock_code}: {str(e)}")
return None, None
def sync_blocks(self):
"""同步板块成分股"""
current_time = time.time()
if current_time - self.last_sync_time < 300: # 5分钟内不重复同步
return
self.log("开始同步板块成分股...")
tdx_codes = self.read_block_file()
if not tdx_codes:
self.log("未获取到板块成分股")
return
# 创建候选池(如果不存在)
create_sector('我的', CANDIDATE_POOL, True)
# 清空候选池
existing_stocks = get_stock_list_by_sector(CANDIDATE_POOL)
for stock in existing_stocks:
remove_stock_from_sector(CANDIDATE_POOL, stock)
qualified_stocks = []
for tdx_code in tdx_codes:
qmt_code = self.convert_to_qmt_format(tdx_code)
if not qmt_code:
continue
last_close, ma20 = self.calculate_ma20(qmt_code)
if last_close is None or ma20 is None:
continue
# 判断是否满足条件:收盘价大于20日均线
if last_close > ma20:
add_stock_to_sector(CANDIDATE_POOL, qmt_code)
qualified_stocks.append(qmt_code)
self.log(f"添加 {qmt_code} 到候选池 | 收盘价:{last_close:.2f} > 20日均线:{ma20:.2f}")
self.log(f"同步完成! 共处理 {len(tdx_codes)} 只股票, 满足条件 {len(qualified_stocks)} 只")
self.last_sync_time = current_time
def close(self):
"""关闭资源"""
if self.logger:
self.logger.close()
def init(ContextInfo):
创建策略实例
ContextInfo.selector = BlockSelector(ContextInfo)
# 设置定时任务:开盘前同步一次,盘中每小时同步一次
today = datetime.today().strftime("%Y-%m-%d")
ContextInfo.run_time("before_market_open", "", f"{today} 08:50:00")
ContextInfo.run_time("market_hour_sync", "60m", f"{today} 10:00:00")
ContextInfo.selector.log("策略初始化完成")
def before_market_open(ContextInfo):
"""开盘前任务"""
ContextInfo.selector.log("开盘前同步板块成分股")
ContextInfo.selector.sync_blocks()
def market_hour_sync(ContextInfo):
"""盘中定时同步"""
if not ContextInfo.is_trading_time(): # 非交易时间不执行
return
ContextInfo.selector.log("盘中定时同步板块成分股")
ContextInfo.selector.sync_blocks()
def handlebar(ContextInfo):
"""K线处理函数"""
pass # 本策略不需要处理K线
def exit(ContextInfo):
"""策略退出时清理资源"""
ContextInfo.selector.log("策略退出")
ContextInfo.selector.close()