Files
sanguo_quant_live/zhaoyun-data/test_adapter.py
T
cfdaily affcfa0c72 按照工作流规则进行目录整理
**主要调整:**
1. 重命名将军工作区目录:
   - data-engineering → zhaoyun-data (赵云数据工程)
   - risk-management → guanyu-risk (关羽风控管理)
   - platform → jiangwei-platform (姜维平台)
   - technical-strategy → zhangfei-technical (张飞技术策略)

2. 创建新目录:
   - archive/ (归档目录)
   - simayi-quality/ (司马懿质量保证)
   - pangtong-value/ (庞统价值投资)

3. 移动内容:
   - value-investing → pangtong-value/research (庞统价值投资)
   - running_data → zhaoyun-data/data (运行数据)
   - 文件任务管理系统文档 → archive/file-task-system

4. 清理文件:
   - 删除所有日志文件
   - 删除agent脚本
   - 删除knowledge-base (使用统一知识库)

5. 创建标准结构:
   - 各将军目录下创建research/, scripts/, reports/, references/子目录

6. 更新.gitignore:
   - 排除日志文件和临时文件

**依据:** management/workflow-rules.md
**制定:** 庞统(凤雏)
**审核:** 诸葛亮
2026-03-25 17:27:35 +08:00

113 lines
4.0 KiB
Python

#!/usr/bin/env python3
"""
数据适配器测试脚本
测试 akshare → vn.py 数据适配器的基本功能
作者:赵云(数据护军)
日期:2026-03-24
"""
import sys
import os
import logging
# 添加当前目录到路径
sys.path.insert(0, os.path.dirname(__file__))
from akshare_vnpy_adapter import AkshareToVnpyAdapter
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
def test_adapter():
"""测试适配器功能"""
# 创建适配器
db_path = '/Users/chufeng/.openclaw/workspace-pangtong/sanguo_quant_live/running_data/database_test.db'
adapter = AkshareToVnpyAdapter(db_path)
try:
logger.info("=" * 60)
logger.info("开始测试 akshare → vn.py 数据适配器")
logger.info("=" * 60)
# 测试1:初始化数据库
logger.info("\n[测试1] 初始化数据库表结构...")
adapter.initialize_database()
logger.info("✓ 数据库初始化成功")
# 测试2:获取股票列表
logger.info("\n[测试2] 获取股票列表...")
stock_list = adapter.get_stock_list()
logger.info(f"✓ 获取到 {len(stock_list)} 只股票")
logger.info(f" 前5只: {stock_list.head().to_string()}")
# 测试3:测试单只股票数据获取(茅台 600519)
logger.info("\n[测试3] 获取单只股票数据(茅台 600519)...")
test_code = '600519'
df = adapter.fetch_stock_daily(test_code, start_date="20250101", end_date="20250324")
logger.info(f"✓ 获取到 {len(df)} 条K线数据")
if len(df) > 0:
logger.info(f" 数据列: {list(df.columns)}")
logger.info(f" 前3条:\n{df.head(3).to_string()}")
logger.info(f" 后3条:\n{df.tail(3).to_string()}")
# 测试4:数据格式转换
logger.info("\n[测试4] 数据格式转换...")
symbol, exchange = adapter.parse_symbol(test_code)
logger.info(f" 股票代码: {test_code} -> symbol={symbol}, exchange={exchange}")
bars = adapter.convert_bar_to_vnpy(df, symbol, exchange, '1d')
logger.info(f"✓ 转换了 {len(bars)} 条记录")
if len(bars) > 0:
logger.info(f" 第一条: {bars[0]}")
# 测试5:批量插入
logger.info("\n[测试5] 批量插入数据库...")
inserted = adapter.insert_bars_bulk(bars)
logger.info(f"✓ 成功插入 {inserted} 条记录")
# 测试6:验证数据
logger.info("\n[测试6] 验证数据完整性...")
integrity = adapter.verify_data_integrity()
logger.info(f"✓ 验证完成,状态: {integrity['status']}")
# 测试7:完整流程测试(下载多只)
logger.info("\n[测试7] 完整流程测试(下载5只股票)...")
test_codes = ['000001', '000002', '600000', '600519', '600036']
for code in test_codes:
logger.info(f" 下载 {code}...")
inserted = adapter.download_and_insert_stock_daily(code, start_date="20250101")
logger.info(f"{code}: {inserted}")
# 最终验证
logger.info("\n[最终验证] 数据完整性验证...")
integrity = adapter.verify_data_integrity()
logger.info("=" * 60)
logger.info("测试完成!")
logger.info("=" * 60)
logger.info(f"总K线记录: {integrity['total_bars']}")
logger.info(f"股票数量: {integrity['total_stocks']}")
logger.info(f"时间范围: {integrity['min_date']} ~ {integrity['max_date']}")
logger.info(f"状态: {integrity['status']}")
logger.info("=" * 60)
return True
except Exception as e:
logger.error(f"测试失败: {e}", exc_info=True)
return False
finally:
adapter.close()
if __name__ == '__main__':
success = test_adapter()
sys.exit(0 if success else 1)