本人自用,可以从L1 Tick转成全市场分钟数据。不保证没错,仅供参考。欢迎讨论。
from xtquant import xtdata
import numpy as np
from datetime import datetime
from zoneinfo import ZoneInfo
from typing import Callable, Dict
class L1TickToMinute:
def __init__(self, minute_windows, on_minute_closed_callback: Callable[[Dict], None]):
self.on_minute_closed_callback = on_minute_closed_callback
self.minute_windows = minute_windows
# 当前分钟窗口索引
self.minute_idx = 0
# 当前K线数据
self.open = None
self.high = None
self.low = None
self.close = None
self.start_volume = 0 # 当前k线对应的开始成交股数
self.end_volume = 0 # 当前k线对应的结束成交股数
self.start_amount = 0 # 当前k线对应的开始成额
self.end_amount = 0 # 当前k线对应的结束成交额
self.prev_low = None # 前一个tick的low值
self.prev_high = None # 前一个tick的high值
# 上一根K线收盘价(用于无成交K线)
self.last_close = 0.0
def update(self, tick: Dict):
"""
处理新的Tick,到达边界时生成分钟K线
"""
if self.minute_idx == 240:
return
ts = tick["time"]
price = tick["lastPrice"]
high = tick['high']
low = tick['low']
cumulated_volume = tick["volume"]
cumulated_amount = tick["amount"]
# 没到9:25,跳过
if (ts < self.minute_windows[0][0]) | (price == 0):
return
# 9:31, 13:01的初始值
if self.open is None:
self.open = price
self.high = price
self.low = price
self.prev_high = high
self.prev_low = low
# 最后一根K线依赖的Tick时间戳,大于15:00:00的就不断更新,不判断终止。
if self.minute_idx == 239:
if ts >= self.minute_windows[self.minute_idx][1]:
self.open = price
self.high = price
self.low = price
self.close = price
self.end_volume = cumulated_volume
self.end_amount = cumulated_amount
return
if ts >= self.minute_windows[self.minute_idx][1]:
# 上一根K线结束,吐出
if self.minute_idx == 119: # 11:30的处理
self.end_volume = cumulated_volume
self.end_amount = cumulated_amount
self.close = price
if high > self.prev_high: self.high = high
if low < self.prev_low: self.low = low
self.prev_high = high
self.prev_low = low
self._emit_kline()
# 新的k线
self.open = None
self.high = None
self.low = None
self.close = None
self.start_volume = self.end_volume
self.start_amount = self.end_amount
self.end_volume = cumulated_volume
self.end_amount = cumulated_amount
else:
self._emit_kline()
# 新的k线
self.open = price
self.high = price
self.low = price
self.close = price
if high > self.prev_high: self.high = high
if low < self.prev_low: self.low = low
self.prev_high = high
self.prev_low = low
self.start_volume = self.end_volume
self.start_amount = self.end_amount
self.end_volume = cumulated_volume
self.end_amount = cumulated_amount
self.minute_idx += 1
return
# 同一分钟:快速聚合
if price > self.high: self.high = price
if price < self.low: self.low = price
if high > self.prev_high: self.high = high
if low < self.prev_low: self.low = low
self.prev_high = high
self.prev_low = low
self.close = price
self.end_volume = cumulated_volume
self.end_amount = cumulated_amount
def _emit_kline(self):
"""推送一分钟K线"""
kline = {
"timestamp": self.minute_windows[self.minute_idx][2],
"open": self.open,
"high": self.high,
"low": self.low,
"close": self.close,
"volume": self.end_volume - self.start_volume,
"amount": self.end_amount - self.start_amount
}
if self.on_minute_closed_callback is not None:
self.on_minute_closed_callback(kline)
def emit_last_kline(self):
self._emit_kline()
def generate_a_stock_minute_windows(trading_day: str | int) -> np.ndarray:
"""
严格按需求生成 A 股 240 根 1 分钟 K 线窗口
规则:
总共 240 根:上午 120 根 + 下午 120 根
第 1 根:09:25 ~ 09:31(6分钟合并)
第 2 根开始:09:31 ~ 09:32,每分钟一根,直到 11:30
下午:13:00 ~ 15:00,每分钟一根,共 120 根
返回:
shape=(240, 3) 的 numpy 数组,每行为 [开始时间戳ms, 结束时间戳ms,标签时间戳]
"""
# 日期处理
if isinstance(trading_day, int):
trading_day = f"{trading_day:08d}"
dt = datetime.strptime(trading_day, "%Y%m%d")
dt = dt.replace(tzinfo=ZoneInfo("Asia/Shanghai"))
base_ms = int(dt.timestamp() * 1000) # 当天0点毫秒时间戳
windows = []
t1_start = base_ms + (9*60 + 25) * 60_000
t1_end = base_ms + (9*60 + 31) * 60_000
windows.append((t1_start, t1_end, t1_end))
start_min = 9*60 + 31
end_min = 11*60 + 30
for m in range(start_min, end_min):
s = base_ms + m * 60_000
e = s + 60_000
windows.append((s, e, e))
start_min_pm = 13*60 + 0
end_min_pm = 15*60 + 0
for m in range(start_min_pm, end_min_pm):
s = base_ms + m * 60_000
e = s + 60_000
windows.append((s, e, e))
arr = np.array(windows, dtype=np.int64)
arr[0][0] -= 1 # 我们要包括9:25这个边界
return arr
if __name__ == "__main__":
def minute_callback(kline):
print(datetime.fromtimestamp(kline['timestamp'] / 1000))
print(kline)
minute_windows = generate_a_stock_minute_windows('20260310')
klgen = L1TickToMinute(minute_windows=minute_windows, on_minute_closed_callback=minute_callback)
tick_data = xtdata.get_market_data([], ['600000.SH'],
start_time='20260310090000',
end_time='20260311000000', period='tick')
for tick in tick_data['600000.SH']:
klgen.update(tick)
klgen.emit_last_kline()
|