38 KiB
vn.py框架本地数据集成方案详解
目录
概述
vn.py(VeighNa)是一个基于Python的开源量化交易平台开发框架,采用事件驱动架构。在量化交易中,数据是核心资产,如何高效、可靠地将本地数据集成到vn.py框架中是策略开发和回测的关键环节。
本文档详细介绍五种常见的本地数据集成方法,包括具体实现步骤、优缺点分析、适用场景,并提供完整的代码示例。
数据接口适配器模式
核心思想
通过实现vn.py提供的标准化数据接口(BaseDatafeed),将本地数据源适配为vn.py可识别的数据服务。这是最符合vn.py架构设计的集成方式。
实现步骤
1. 理解BaseDatafeed接口
vn.py在vnpy.trader.datafeed模块中定义了BaseDatafeed抽象基类,主要方法包括:
init(): 初始化数据服务query_bar_history(): 查询K线历史数据query_tick_history(): 查询Tick历史数据
2. 创建自定义数据适配器类
# local_datafeed.py
from typing import List, Optional
from datetime import datetime
from vnpy.trader.object import BarData, TickData
from vnpy.trader.constant import Interval
from vnpy.trader.datafeed import BaseDatafeed
class LocalDatafeed(BaseDatafeed):
"""本地数据适配器"""
def __init__(self):
super().__init__()
self.data_path = "./local_data" # 本地数据路径
self.inited = False
def init(self, output: bool = True) -> bool:
"""初始化数据服务"""
try:
# 这里可以做一些初始化工作,比如检查数据目录、加载元数据等
import os
if not os.path.exists(self.data_path):
os.makedirs(self.data_path)
self.inited = True
if output:
print("本地数据服务初始化成功")
return True
except Exception as e:
if output:
print(f"本地数据服务初始化失败: {e}")
return False
def query_bar_history(
self,
symbol: str,
exchange: str,
interval: Interval,
start: datetime,
end: datetime,
limit: int = 1000
) -> List[BarData]:
"""查询K线历史数据"""
if not self.inited:
return []
bars = []
try:
# 这里实现从本地数据源读取K线数据的逻辑
# 示例:从CSV文件读取
import pandas as pd
import os
filename = f"{symbol}_{exchange.value}_{interval.value}.csv"
filepath = os.path.join(self.data_path, filename)
if not os.path.exists(filepath):
print(f"数据文件不存在: {filepath}")
return []
df = pd.read_csv(filepath)
df['datetime'] = pd.to_datetime(df['datetime'])
# 过滤时间范围
mask = (df['datetime'] >= start) & (df['datetime'] <= end)
df = df.loc[mask].head(limit)
# 转换为BarData对象
for _, row in df.iterrows():
bar = BarData(
symbol=symbol,
exchange=exchange,
interval=interval,
datetime=row['datetime'],
open_price=row['open'],
high_price=row['high'],
low_price=row['low'],
close_price=row['close'],
volume=row['volume'],
turnover=row.get('turnover', 0),
gateway_name="LOCAL"
)
bars.append(bar)
except Exception as e:
print(f"查询K线数据失败: {e}")
return bars
def query_tick_history(
self,
symbol: str,
exchange: str,
start: datetime,
end: datetime,
limit: int = 1000
) -> List[TickData]:
"""查询Tick历史数据"""
# 实现类似query_bar_history,但针对Tick数据
# 这里省略具体实现
return []
3. 配置vn.py使用自定义数据适配器
在vn.py的配置文件(通常是vt_setting.json)中添加:
{
"datafeed.name": "local",
"datafeed.username": "",
"datafeed.password": ""
}
然后在主程序中注册数据适配器:
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.datafeed import DatafeedManager
from local_datafeed import LocalDatafeed
# 创建主引擎
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
# 注册本地数据适配器
datafeed_manager = main_engine.get_engine("datafeed")
datafeed_manager.add_datafeed("local", LocalDatafeed)
优缺点分析
| 优点 | 缺点 |
|---|---|
| 符合vn.py原生架构,与框架完美集成 | 需要编写较多的适配代码 |
| 可以利用vn.py已有的数据管理功能 | 对vn.py内部结构需要较深入了解 |
| 支持vn.py所有需要数据服务的功能模块 | 初期开发工作量较大 |
| 接口标准化,易于维护和扩展 |
适用场景
- 需要在vn.py中全面使用本地数据
- 策略开发、回测和实盘交易都需要本地数据支持
- 希望长期维护和使用的项目
数据服务化架构
核心思想
将本地数据封装成独立的数据服务,通过REST API、gRPC或消息队列等方式提供数据访问接口,vn.py通过网络请求获取数据。
实现步骤
1. 创建数据服务端
使用FastAPI创建一个简单的数据服务:
# data_service.py
from fastapi import FastAPI, Query
from typing import List, Optional
from datetime import datetime
from pydantic import BaseModel
import pandas as pd
import os
app = FastAPI(title="本地数据服务")
DATA_PATH = "./local_data"
class BarDataResponse(BaseModel):
symbol: str
exchange: str
datetime: datetime
open: float
high: float
low: float
close: float
volume: float
turnover: Optional[float] = 0.0
@app.get("/bars", response_model=List[BarDataResponse])
async def get_bars(
symbol: str = Query(..., description="合约代码"),
exchange: str = Query(..., description="交易所代码"),
interval: str = Query("1m", description="K线周期"),
start: datetime = Query(..., description="开始时间"),
end: datetime = Query(..., description="结束时间"),
limit: int = Query(1000, description="返回数量限制")
):
"""获取K线数据"""
try:
filename = f"{symbol}_{exchange}_{interval}.csv"
filepath = os.path.join(DATA_PATH, filename)
if not os.path.exists(filepath):
return []
df = pd.read_csv(filepath)
df['datetime'] = pd.to_datetime(df['datetime'])
# 过滤时间范围
mask = (df['datetime'] >= start) & (df['datetime'] <= end)
df = df.loc[mask].head(limit)
# 构造响应
bars = []
for _, row in df.iterrows():
bar = BarDataResponse(
symbol=symbol,
exchange=exchange,
datetime=row['datetime'],
open=row['open'],
high=row['high'],
low=row['low'],
close=row['close'],
volume=row['volume'],
turnover=row.get('turnover', 0.0)
)
bars.append(bar)
return bars
except Exception as e:
print(f"获取K线数据失败: {e}")
return []
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
2. 创建vn.py端的数据访问客户端
# remote_datafeed.py
from typing import List
from datetime import datetime
import requests
from vnpy.trader.object import BarData, TickData
from vnpy.trader.constant import Interval, Exchange
from vnpy.trader.datafeed import BaseDatafeed
class RemoteDatafeed(BaseDatafeed):
"""远程数据服务适配器"""
def __init__(self):
super().__init__()
self.base_url = "http://localhost:8000"
self.inited = False
def init(self, output: bool = True) -> bool:
"""初始化"""
try:
# 测试服务连接
response = requests.get(f"{self.base_url}/docs")
if response.status_code == 200:
self.inited = True
if output:
print("远程数据服务连接成功")
return True
except Exception as e:
if output:
print(f"远程数据服务连接失败: {e}")
return False
def query_bar_history(
self,
symbol: str,
exchange: Exchange,
interval: Interval,
start: datetime,
end: datetime,
limit: int = 1000
) -> List[BarData]:
"""查询K线数据"""
if not self.inited:
return []
try:
params = {
"symbol": symbol,
"exchange": exchange.value,
"interval": interval.value,
"start": start.isoformat(),
"end": end.isoformat(),
"limit": limit
}
response = requests.get(f"{self.base_url}/bars", params=params)
if response.status_code != 200:
print(f"请求失败: {response.status_code}")
return []
data = response.json()
bars = []
for item in data:
bar = BarData(
symbol=item['symbol'],
exchange=Exchange(item['exchange']),
interval=interval,
datetime=datetime.fromisoformat(item['datetime']),
open_price=item['open'],
high_price=item['high'],
low_price=item['low'],
close_price=item['close'],
volume=item['volume'],
turnover=item.get('turnover', 0),
gateway_name="REMOTE"
)
bars.append(bar)
return bars
except Exception as e:
print(f"查询远程K线数据失败: {e}")
return []
def query_tick_history(
self,
symbol: str,
exchange: Exchange,
start: datetime,
end: datetime,
limit: int = 1000
) -> List[TickData]:
"""查询Tick数据"""
# 类似实现,这里省略
return []
3. 启动服务并在vn.py中使用
# 首先在终端启动数据服务
# python data_service.py
# 然后在vn.py中使用
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from remote_datafeed import RemoteDatafeed
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
datafeed_manager = main_engine.get_engine("datafeed")
datafeed_manager.add_datafeed("remote", RemoteDatafeed)
优缺点分析
| 优点 | 缺点 |
|---|---|
| 数据服务与vn.py解耦,可独立部署和扩展 | 增加了网络开销和延迟 |
| 支持多客户端访问,数据可共享 | 需要维护额外的服务端 |
| 可以实现复杂的数据预处理和缓存逻辑 | 服务可用性成为关键依赖 |
| 技术栈灵活,不局限于Python | 增加了系统复杂度 |
适用场景
- 多团队或多程序需要共享同一数据源
- 需要对数据进行复杂的预处理和计算
- 数据量大,需要分布式存储和处理
- 需要跨语言访问数据
文件系统直接访问
核心思想
直接在vn.py策略或模块中读取本地文件(CSV、Parquet、HDF5等),将数据转换为vn.py的数据对象。这是最简单直接的方式。
实现步骤
1. 准备本地数据文件
假设我们有一个CSV格式的K线数据文件:
datetime,open,high,low,close,volume,turnover
2024-01-01 09:00:00,5000.0,5010.0,4990.0,5005.0,1000,5005000.0
2024-01-01 09:01:00,5005.0,5020.0,5000.0,5015.0,1200,6018000.0
...
2. 创建数据加载工具类
# file_data_loader.py
from typing import List, Optional
from datetime import datetime
from pathlib import Path
import pandas as pd
from vnpy.trader.object import BarData, TickData
from vnpy.trader.constant import Interval, Exchange
class FileDataLoader:
"""文件数据加载器"""
def __init__(self, data_dir: str = "./data"):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
def load_bars_from_csv(
self,
filename: str,
symbol: str,
exchange: Exchange,
interval: Interval,
datetime_col: str = "datetime",
open_col: str = "open",
high_col: str = "high",
low_col: str = "low",
close_col: str = "close",
volume_col: str = "volume",
turnover_col: Optional[str] = "turnover"
) -> List[BarData]:
"""从CSV文件加载K线数据"""
filepath = self.data_dir / filename
if not filepath.exists():
raise FileNotFoundError(f"数据文件不存在: {filepath}")
df = pd.read_csv(filepath)
df[datetime_col] = pd.to_datetime(df[datetime_col])
bars = []
for _, row in df.iterrows():
bar = BarData(
symbol=symbol,
exchange=exchange,
interval=interval,
datetime=row[datetime_col],
open_price=row[open_col],
high_price=row[high_col],
low_price=row[low_col],
close_price=row[close_col],
volume=row[volume_col],
turnover=row[turnover_col] if turnover_col else 0,
gateway_name="FILE"
)
bars.append(bar)
return bars
def load_bars_from_parquet(
self,
filename: str,
symbol: str,
exchange: Exchange,
interval: Interval
) -> List[BarData]:
"""从Parquet文件加载K线数据(性能更好)"""
filepath = self.data_dir / filename
if not filepath.exists():
raise FileNotFoundError(f"数据文件不存在: {filepath}")
df = pd.read_parquet(filepath)
bars = []
for _, row in df.iterrows():
bar = BarData(
symbol=symbol,
exchange=exchange,
interval=interval,
datetime=row['datetime'],
open_price=row['open'],
high_price=row['high'],
low_price=row['low'],
close_price=row['close'],
volume=row['volume'],
turnover=row.get('turnover', 0),
gateway_name="FILE"
)
bars.append(bar)
return bars
def save_bars_to_csv(self, bars: List[BarData], filename: str):
"""将K线数据保存到CSV文件"""
data = []
for bar in bars:
data.append({
'datetime': bar.datetime,
'open': bar.open_price,
'high': bar.high_price,
'low': bar.low_price,
'close': bar.close_price,
'volume': bar.volume,
'turnover': bar.turnover
})
df = pd.DataFrame(data)
filepath = self.data_dir / filename
df.to_csv(filepath, index=False)
print(f"数据已保存到: {filepath}")
3. 在vn.py策略中使用
# strategy_with_file_data.py
from vnpy.trader.object import BarData
from vnpy.trader.strategy import StrategyTemplate
from vnpy.trader.utility import ArrayManager
from file_data_loader import FileDataLoader
class FileDataStrategy(StrategyTemplate):
"""使用文件数据的策略示例"""
author = "Your Name"
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
self.am = ArrayManager(100)
self.data_loader = FileDataLoader("./data")
# 策略参数
self.fast_window = 10
self.slow_window = 30
# 策略变量
self.fast_ma = 0
self.slow_ma = 0
def on_init(self):
"""策略初始化"""
self.write_log("策略初始化")
# 加载历史数据用于预热
try:
symbol, exchange_str = self.vt_symbol.split(".")
exchange = type(self.exchange)(exchange_str)
# 加载历史数据
bars = self.data_loader.load_bars_from_csv(
filename=f"{symbol}_history.csv",
symbol=symbol,
exchange=exchange,
interval=self.interval
)
# 预热数据
for bar in bars:
self.am.update_bar(bar)
self.write_log(f"成功加载{len(bars)}条历史数据")
except Exception as e:
self.write_log(f"加载历史数据失败: {e}")
def on_start(self):
"""策略启动"""
self.write_log("策略启动")
def on_stop(self):
"""策略停止"""
self.write_log("策略停止")
def on_bar(self, bar: BarData):
"""K线推送"""
self.am.update_bar(bar)
if not self.am.inited:
return
# 计算均线
self.fast_ma = self.am.sma(self.fast_window, array=True)
self.slow_ma = self.am.sma(self.slow_window, array=True)
# 策略逻辑
# ... 省略具体策略实现
优缺点分析
| 优点 | 缺点 |
|---|---|
| 实现简单直接,无需额外基础设施 | 数据管理能力较弱 |
| 开发速度快,适合快速验证想法 | 不适合大规模数据 |
| 可使用各种文件格式(CSV、Parquet、HDF5等) | 数据共享困难 |
| 没有网络开销,速度快 | 缺乏统一的数据访问接口 |
适用场景
- 快速策略原型开发和验证
- 数据量较小的个人研究
- 一次性或临时的数据分析任务
- 离线回测场景
数据库存储方案
核心思想
将本地数据存储在数据库中(SQLite、MySQL、PostgreSQL、MongoDB等),通过vn.py的数据库管理接口或直接编写SQL/ORM代码访问数据。
实现步骤
1. 数据库表结构设计
以SQLite为例,设计K线数据表:
# database_schema.py
from sqlalchemy import create_engine, Column, String, Float, DateTime, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class DbBarData(Base):
"""K线数据表"""
__tablename__ = "db_bar_data"
id = Column(Integer, primary_key=True, autoincrement=True)
symbol = Column(String(32), index=True)
exchange = Column(String(32), index=True)
interval = Column(String(16), index=True)
datetime = Column(DateTime, index=True)
open_price = Column(Float)
high_price = Column(Float)
low_price = Column(Float)
close_price = Column(Float)
volume = Column(Float)
turnover = Column(Float)
__table_args__ = (
{'sqlite_autoincrement': True}
)
class DbTickData(Base):
"""Tick数据表"""
__tablename__ = "db_tick_data"
id = Column(Integer, primary_key=True, autoincrement=True)
symbol = Column(String(32), index=True)
exchange = Column(String(32), index=True)
datetime = Column(DateTime, index=True)
# 省略其他字段...
def init_database(db_url: str = "sqlite:///./data/vnpy_data.db"):
"""初始化数据库"""
engine = create_engine(db_url)
Base.metadata.create_all(engine)
return engine, sessionmaker(bind=engine)
2. 创建数据库管理类
# database_manager.py
from typing import List, Optional
from datetime import datetime
from sqlalchemy import and_, desc
from sqlalchemy.orm import Session
from database_schema import DbBarData, init_database
from vnpy.trader.object import BarData
from vnpy.trader.constant import Interval, Exchange
class DatabaseManager:
"""数据库管理器"""
def __init__(self, db_url: str = "sqlite:///./data/vnpy_data.db"):
self.engine, self.SessionLocal = init_database(db_url)
def get_session(self) -> Session:
"""获取数据库会话"""
return self.SessionLocal()
def save_bar_data(self, bars: List[BarData]):
"""保存K线数据"""
if not bars:
return
session = self.get_session()
try:
for bar in bars:
db_bar = DbBarData(
symbol=bar.symbol,
exchange=bar.exchange.value,
interval=bar.interval.value,
datetime=bar.datetime,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
close_price=bar.close_price,
volume=bar.volume,
turnover=bar.turnover
)
session.add(db_bar)
session.commit()
print(f"成功保存{len(bars)}条K线数据")
except Exception as e:
session.rollback()
print(f"保存K线数据失败: {e}")
finally:
session.close()
def load_bar_data(
self,
symbol: str,
exchange: Exchange,
interval: Interval,
start: datetime,
end: datetime,
limit: Optional[int] = None
) -> List[BarData]:
"""加载K线数据"""
session = self.get_session()
try:
query = session.query(DbBarData).filter(
and_(
DbBarData.symbol == symbol,
DbBarData.exchange == exchange.value,
DbBarData.interval == interval.value,
DbBarData.datetime >= start,
DbBarData.datetime <= end
)
).order_by(DbBarData.datetime)
if limit:
query = query.limit(limit)
db_bars = query.all()
bars = []
for db_bar in db_bars:
bar = BarData(
symbol=db_bar.symbol,
exchange=Exchange(db_bar.exchange),
interval=Interval(db_bar.interval),
datetime=db_bar.datetime,
open_price=db_bar.open_price,
high_price=db_bar.high_price,
low_price=db_bar.low_price,
close_price=db_bar.close_price,
volume=db_bar.volume,
turnover=db_bar.turnover,
gateway_name="DB"
)
bars.append(bar)
return bars
except Exception as e:
print(f"加载K线数据失败: {e}")
return []
finally:
session.close()
def get_bar_overview(self) -> List[dict]:
"""获取K线数据概览"""
session = self.get_session()
try:
from sqlalchemy import func
result = session.query(
DbBarData.symbol,
DbBarData.exchange,
DbBarData.interval,
func.min(DbBarData.datetime).label('start'),
func.max(DbBarData.datetime).label('end'),
func.count(DbBarData.id).label('count')
).group_by(
DbBarData.symbol,
DbBarData.exchange,
DbBarData.interval
).all()
overview = []
for row in result:
overview.append({
'symbol': row.symbol,
'exchange': row.exchange,
'interval': row.interval,
'start': row.start,
'end': row.end,
'count': row.count
})
return overview
except Exception as e:
print(f"获取数据概览失败: {e}")
return []
finally:
session.close()
3. 与vn.py集成使用
# 使用示例
from datetime import datetime, timedelta
from vnpy.trader.constant import Exchange, Interval
from database_manager import DatabaseManager
# 初始化数据库管理器
db_manager = DatabaseManager()
# 示例1: 保存数据
# 假设我们有一些BarData对象
# bars = [...]
# db_manager.save_bar_data(bars)
# 示例2: 加载数据
end = datetime.now()
start = end - timedelta(days=30)
bars = db_manager.load_bar_data(
symbol="rb2405",
exchange=Exchange.SHFE,
interval=Interval.MINUTE,
start=start,
end=end
)
print(f"加载了{len(bars)}条K线数据")
# 示例3: 获取数据概览
overview = db_manager.get_bar_overview()
print("数据概览:")
for item in overview:
print(f"{item['symbol']}.{item['exchange']} {item['interval']}: {item['count']}条 ({item['start']} ~ {item['end']})")
优缺点分析
| 优点 | 缺点 |
|---|---|
| 数据管理能力强,支持查询、索引、事务 | 需要数据库基础设施 |
| 适合大规模数据存储和查询 | 有一定的学习和维护成本 |
| 数据一致性和可靠性高 | SQLite在并发写入时性能有限 |
| 可以利用SQL的强大查询能力 | 关系型数据库对非结构化数据支持有限 |
适用场景
- 数据量大,需要高效查询
- 需要长期保存和管理历史数据
- 需要复杂的数据查询和分析
- 多程序或多策略共享数据源
混合模式
核心思想
结合以上多种方法,根据不同的数据特性和使用场景选择最适合的集成方式,形成一个灵活、高效的数据集成解决方案。
典型混合架构
┌─────────────────────────────────────────────────────────┐
│ vn.py 应用 │
├─────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 策略模块 │ │ 回测引擎 │ │ 数据管理UI │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼───────────────────┼───────────────────┼──────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────┐
│ 数据访问抽象层 │
├─────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ BaseDatafeed│ │ 数据管理器 │ │ 缓存管理器 │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼───────────────────┼───────────────────┼──────────┘
│ │ │
┌─────┴─────┬─────────────┴─────────────┬───┴─────┐
▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────────────┐ ┌────────┐ ┌────────┐
│ 文件 │ │ 数据库 │ │ 数据服务 │ │ 缓存 │ │ 适配器 │
│ 系统 │ │ │ │ (REST/gRPC) │ │ (Redis)│ │ (其他) │
└────────┘ └────────┘ └────────────────┘ └────────┘ └────────┘
实现步骤
1. 创建统一的数据访问接口
# hybrid_data_manager.py
from typing import List, Optional, Union
from datetime import datetime
from pathlib import Path
from vnpy.trader.object import BarData, TickData
from vnpy.trader.constant import Interval, Exchange
from file_data_loader import FileDataLoader
from database_manager import DatabaseManager
class HybridDataManager:
"""混合数据管理器"""
def __init__(
self,
data_dir: str = "./data",
db_url: str = "sqlite:///./data/vnpy_data.db",
use_cache: bool = True
):
self.file_loader = FileDataLoader(data_dir)
self.db_manager = DatabaseManager(db_url)
self.use_cache = use_cache
# 简单的内存缓存
self._bar_cache = {}
def _get_cache_key(
self,
symbol: str,
exchange: Exchange,
interval: Interval,
start: datetime,
end: datetime
) -> str:
"""生成缓存键"""
return f"{symbol}_{exchange.value}_{interval.value}_{start.isoformat()}_{end.isoformat()}"
def get_bars(
self,
symbol: str,
exchange: Exchange,
interval: Interval,
start: datetime,
end: datetime,
source: str = "auto" # auto, database, file, remote
) -> List[BarData]:
"""获取K线数据(智能选择数据源)"""
# 检查缓存
cache_key = self._get_cache_key(symbol, exchange, interval, start, end)
if self.use_cache and cache_key in self._bar_cache:
return self._bar_cache[cache_key]
bars = []
if source == "auto":
# 自动选择数据源策略
# 1. 优先从数据库获取
bars = self.db_manager.load_bar_data(symbol, exchange, interval, start, end)
# 2. 如果数据库没有,尝试文件
if not bars:
try:
filename = f"{symbol}_{exchange.value}_{interval.value}.csv"
bars = self.file_loader.load_bars_from_csv(
filename, symbol, exchange, interval
)
# 同时保存到数据库,下次就可以直接从数据库获取
if bars:
self.db_manager.save_bar_data(bars)
except FileNotFoundError:
pass
# 3. 如果文件也没有,可以尝试从远程服务获取(这里省略)
# if not bars:
# bars = self.remote_datafeed.query_bar_history(...)
elif source == "database":
bars = self.db_manager.load_bar_data(symbol, exchange, interval, start, end)
elif source == "file":
try:
filename = f"{symbol}_{exchange.value}_{interval.value}.csv"
bars = self.file_loader.load_bars_from_csv(filename, symbol, exchange, interval)
except FileNotFoundError:
pass
# 缓存结果
if self.use_cache and bars:
self._bar_cache[cache_key] = bars
return bars
def save_bars(
self,
bars: List[BarData],
destinations: List[str] = None
):
"""保存K线数据到多个目标"""
if not bars:
return
if destinations is None:
destinations = ["database", "file"]
# 保存到数据库
if "database" in destinations:
self.db_manager.save_bar_data(bars)
# 保存到文件
if "file" in destinations:
bar = bars[0]
filename = f"{bar.symbol}_{bar.exchange.value}_{bar.interval.value}.csv"
self.file_loader.save_bars_to_csv(bars, filename)
# 清除相关缓存
self._bar_cache.clear()
def get_data_overview(self) -> dict:
"""获取完整的数据概览"""
overview = {
"database": self.db_manager.get_bar_overview(),
"files": []
}
# 获取文件数据概览
data_dir = Path(self.file_loader.data_dir)
for filepath in data_dir.glob("*.csv"):
overview["files"].append({
"filename": filepath.name,
"size": filepath.stat().st_size,
"modified": datetime.fromtimestamp(filepath.stat().st_mtime)
})
return overview
2. 在vn.py策略中使用混合数据管理
# hybrid_strategy.py
from vnpy.trader.object import BarData
from vnpy.trader.strategy import StrategyTemplate
from hybrid_data_manager import HybridDataManager
class HybridDataStrategy(StrategyTemplate):
"""使用混合数据管理的策略"""
author = "Your Name"
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
# 初始化混合数据管理器
self.data_manager = HybridDataManager(
data_dir="./data",
db_url="sqlite:///./data/strategy_data.db",
use_cache=True
)
# 策略参数和变量
# ...
def on_init(self):
"""策略初始化"""
self.write_log("策略初始化")
# 加载历史数据
from datetime import datetime, timedelta
symbol, exchange_str = self.vt_symbol.split(".")
exchange = type(self.exchange)(exchange_str)
end = datetime.now()
start = end - timedelta(days=60)
# 使用混合数据管理器加载数据
bars = self.data_manager.get_bars(
symbol=symbol,
exchange=exchange,
interval=self.interval,
start=start,
end=end,
source="auto" # 自动选择最佳数据源
)
self.write_log(f"成功加载{len(bars)}条历史数据")
# 预热数据
for bar in bars:
self.on_bar(bar)
def on_bar(self, bar: BarData):
"""K线推送"""
# 策略逻辑
# ...
# 可选:实时保存数据
# self.data_manager.save_bars([bar])
优缺点分析
| 优点 | 缺点 |
|---|---|
| 灵活性最高,可以根据场景选择最佳方式 | 系统复杂度较高 |
| 可以充分发挥各种方案的优势 | 需要维护多个数据存储 |
| 数据可靠性高,多重备份 | 数据一致性管理较复杂 |
| 性能最优,热点数据可缓存 | 学习和维护成本高 |
适用场景
- 复杂的生产环境
- 对数据可靠性要求极高的场景
- 需要处理多种类型和来源的数据
- 有一定技术能力和维护资源的团队
方案对比与选择建议
方案对比表
| 方案 | 实现难度 | 性能 | 可维护性 | 扩展性 | 适用数据量 |
|---|---|---|---|---|---|
| 数据接口适配器 | 中 | 中 | 高 | 高 | 中-大 |
| 数据服务化架构 | 高 | 中 | 高 | 很高 | 大 |
| 文件系统直接访问 | 低 | 高 | 低 | 低 | 小-中 |
| 数据库存储方案 | 中 | 中高 | 中高 | 中高 | 中-大 |
| 混合模式 | 很高 | 很高 | 中 | 很高 | 任意 |
选择建议
-
快速原型和验证
- 推荐:文件系统直接访问
- 理由:实现简单,开发速度快
-
个人研究和小团队
- 推荐:数据库存储方案 + 文件系统
- 理由:平衡了功能和复杂度
-
中型团队和生产环境
- 推荐:数据接口适配器模式 + 数据库
- 理由:符合vn.py架构,便于维护
-
大型团队和企业级应用
- 推荐:数据服务化架构 或 混合模式
- 理由:可扩展性强,支持多团队共享
-
特殊场景
- 需要跨语言访问:数据服务化架构
- 对性能要求极致:文件系统(Parquet/HDF5) + 缓存
- 数据来源复杂:混合模式
最佳实践建议
-
数据标准化
- 统一数据格式和命名规范
- 建立数据质量检查机制
-
分层设计
- 数据访问层与业务逻辑分离
- 使用抽象接口便于切换实现
-
缓存策略
- 热点数据内存缓存
- 考虑使用Redis等分布式缓存
-
监控和日志
- 记录数据访问日志
- 监控数据加载性能
-
备份和恢复
- 定期备份重要数据
- 测试恢复流程
总结
将本地数据集成到vn.py框架有多种方法,每种方法都有其优缺点和适用场景。选择合适的方案需要考虑:
- 项目规模和复杂度
- 团队技术能力
- 数据量和访问模式
- 性能和可靠性要求
- 维护和扩展需求
建议从小处着手,先使用简单的方案验证想法,然后根据需要逐步演进到更复杂的架构。混合模式虽然复杂,但往往是长期生产环境的最佳选择。
希望本文档能帮助你选择和实现适合的本地数据集成方案!