返回列表 发布新帖

L1 Tick数据到1min分钟数据转换,python。

65 0
发表于 前天 11:44 | 显示全部楼层 阅读模式

本人自用,可以从L1 Tick转成全市场分钟数据。不保证没错,仅供参考。欢迎讨论。

from xtquant import xtdata
import numpy as np
from datetime import datetime
from zoneinfo import ZoneInfo
from typing import Callable, Dict

class L1TickToMinute:
    def __init__(self, minute_windows, on_minute_closed_callback: Callable[[Dict], None]):
        self.on_minute_closed_callback = on_minute_closed_callback
        self.minute_windows = minute_windows
        # 当前分钟窗口索引
        self.minute_idx = 0

        # 当前K线数据
        self.open = None
        self.high = None
        self.low = None
        self.close = None
        self.start_volume = 0 # 当前k线对应的开始成交股数
        self.end_volume = 0 # 当前k线对应的结束成交股数
        self.start_amount = 0 # 当前k线对应的开始成额
        self.end_amount = 0 # 当前k线对应的结束成交额
        self.prev_low = None # 前一个tick的low值
        self.prev_high = None # 前一个tick的high值

        # 上一根K线收盘价(用于无成交K线)
        self.last_close = 0.0

    def update(self, tick: Dict):
        """
        处理新的Tick,到达边界时生成分钟K线
        """
        if self.minute_idx == 240:
            return

        ts = tick["time"]
        price = tick["lastPrice"]
        high = tick['high']
        low = tick['low']
        cumulated_volume = tick["volume"]
        cumulated_amount = tick["amount"]

        # 没到9:25,跳过
        if (ts < self.minute_windows[0][0]) | (price == 0):
            return

        # 9:31, 13:01的初始值
        if self.open is None:
            self.open = price
            self.high = price
            self.low = price
            self.prev_high = high
            self.prev_low = low

        # 最后一根K线依赖的Tick时间戳,大于15:00:00的就不断更新,不判断终止。
        if self.minute_idx == 239:
            if ts >= self.minute_windows[self.minute_idx][1]:
                self.open = price
                self.high = price
                self.low = price
                self.close = price
                self.end_volume = cumulated_volume
                self.end_amount = cumulated_amount
            return

        if ts >= self.minute_windows[self.minute_idx][1]:
            # 上一根K线结束,吐出
            if self.minute_idx == 119: # 11:30的处理
                self.end_volume = cumulated_volume
                self.end_amount = cumulated_amount
                self.close = price
                if high > self.prev_high: self.high = high
                if low < self.prev_low: self.low = low
                self.prev_high = high
                self.prev_low = low
                self._emit_kline()
                # 新的k线          
                self.open = None
                self.high = None
                self.low = None
                self.close = None
                self.start_volume = self.end_volume
                self.start_amount = self.end_amount
                self.end_volume = cumulated_volume
                self.end_amount = cumulated_amount
            else:
                self._emit_kline()
                # 新的k线
                self.open = price
                self.high = price
                self.low = price
                self.close = price
                if high > self.prev_high: self.high = high
                if low < self.prev_low: self.low = low
                self.prev_high = high
                self.prev_low = low
                self.start_volume = self.end_volume
                self.start_amount = self.end_amount
                self.end_volume = cumulated_volume
                self.end_amount = cumulated_amount

            self.minute_idx += 1
            return

        # 同一分钟:快速聚合
        if price > self.high: self.high = price
        if price < self.low:  self.low = price
        if high > self.prev_high: self.high = high
        if low < self.prev_low: self.low = low
        self.prev_high = high
        self.prev_low = low
        self.close = price
        self.end_volume = cumulated_volume
        self.end_amount = cumulated_amount

    def _emit_kline(self):
        """推送一分钟K线"""
        kline = {
            "timestamp": self.minute_windows[self.minute_idx][2],
            "open": self.open,
            "high": self.high,
            "low": self.low,
            "close": self.close,
            "volume": self.end_volume - self.start_volume,
            "amount": self.end_amount - self.start_amount
        }
        if self.on_minute_closed_callback is not None:
            self.on_minute_closed_callback(kline)

    def emit_last_kline(self):
        self._emit_kline()

def generate_a_stock_minute_windows(trading_day: str | int) -> np.ndarray:
    """
    严格按需求生成 A 股 240 根 1 分钟 K 线窗口
    规则:
      总共 240 根:上午 120 根 + 下午 120 根
      第 1 根:09:25 ~ 09:31(6分钟合并)
      第 2 根开始:09:31 ~ 09:32,每分钟一根,直到 11:30
      下午:13:00 ~ 15:00,每分钟一根,共 120 根
    返回:
      shape=(240, 3) 的 numpy 数组,每行为 [开始时间戳ms, 结束时间戳ms,标签时间戳]
    """
    # 日期处理
    if isinstance(trading_day, int):
        trading_day = f"{trading_day:08d}"
    dt = datetime.strptime(trading_day, "%Y%m%d")
    dt = dt.replace(tzinfo=ZoneInfo("Asia/Shanghai"))
    base_ms = int(dt.timestamp() * 1000)  # 当天0点毫秒时间戳

    windows = []

    t1_start = base_ms + (9*60 + 25) * 60_000
    t1_end   = base_ms + (9*60 + 31) * 60_000
    windows.append((t1_start, t1_end, t1_end))

    start_min = 9*60 + 31
    end_min   = 11*60 + 30
    for m in range(start_min, end_min):
        s = base_ms + m * 60_000
        e = s + 60_000
        windows.append((s, e, e))

    start_min_pm = 13*60 + 0
    end_min_pm   = 15*60 + 0
    for m in range(start_min_pm, end_min_pm):
        s = base_ms + m * 60_000
        e = s + 60_000
        windows.append((s, e, e))

    arr = np.array(windows, dtype=np.int64)
    arr[0][0] -= 1 # 我们要包括9:25这个边界
    return arr

if __name__ == "__main__":
    def minute_callback(kline):
        print(datetime.fromtimestamp(kline['timestamp'] / 1000))
        print(kline)
    minute_windows = generate_a_stock_minute_windows('20260310')
    klgen = L1TickToMinute(minute_windows=minute_windows, on_minute_closed_callback=minute_callback)

    tick_data = xtdata.get_market_data([], ['600000.SH'],
                                       start_time='20260310090000',
                                        end_time='20260311000000', period='tick')

    for tick in tick_data['600000.SH']:
        klgen.update(tick)

    klgen.emit_last_kline()

回复

您需要登录后才可以回帖 登录 | 立即注册

主题

1

回帖

0

积分

0

客服专线

400-080-8112

用思考的速度交易,用真诚的态度合作,我们是认真的!
  • 关注公众号
  • 添加微信客服
Copyright © 2001-2026 迅投QMT社区 版权所有 All Rights Reserved. 京ICP备2025122616号-3
关灯 快速发帖
扫一扫添加微信客服
QQ客服返回顶部
快速回复 返回顶部 返回列表