Files
2026-03-26 11:43:55 +08:00

729 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
A股数据质量管理脚本
负责数据完整性、准确性检查
支持每日/每周增量更新机制
"""
import sys
import os
import time
import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import logging
import warnings
warnings.filterwarnings('ignore')
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class AStockDataQualityManager:
"""A股数据质量管理器"""
def __init__(self):
"""初始化质量管理器"""
logger.info("A股数据质量管理器初始化")
# 基础路径
self.base_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data"
# 运行数据路径
self.running_dir = os.path.join(self.base_dir, "running_data")
self.quality_dir = os.path.join(self.running_dir, "quality_monitor")
self.update_logs_dir = os.path.join(self.running_dir, "update_logs")
self.config_dir = os.path.join(self.running_dir, "config")
# 确保目录存在
os.makedirs(self.quality_dir, exist_ok=True)
os.makedirs(self.update_logs_dir, exist_ok=True)
os.makedirs(self.config_dir, exist_ok=True)
# 加载配置
self.config = self._load_config()
# 当前时间
self.current_time = datetime.now()
logger.info("质量管理器初始化完成")
def _load_config(self) -> Dict:
"""加载配置
Returns:
Dict: 配置信息
"""
config_file = os.path.join(self.config_dir, "data_quality_config.json")
default_config = {
"quality_checks": {
"completeness": {
"enabled": True,
"check_missing_dates": True,
"min_date_coverage": 0.95,
"critical_threshold": 0.90
},
"accuracy": {
"enabled": True,
"check_price_logic": True,
"check_volume_consistency": True,
"check_financial_calc": True
},
"consistency": {
"enabled": True,
"check_field_formats": True,
"check_data_types": True,
"check_value_ranges": True
}
},
"update_schedule": {
"daily_update": {
"enabled": True,
"time": "18:00",
"data_types": ["daily"]
},
"weekly_update": {
"enabled": True,
"day": "Sunday",
"time": "20:00",
"data_types": ["financial", "info"]
},
"monthly_update": {
"enabled": True,
"day": "01",
"time": "22:00",
"data_types": ["all"]
}
},
"monitoring": {
"alert_enabled": True,
"email_alerts": False,
"log_retention_days": 30,
"report_frequency": "daily"
}
}
if os.path.exists(config_file):
try:
with open(config_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
default_config.update(user_config)
logger.info(f"{config_file} 加载用户配置")
except Exception as e:
logger.warning(f"加载用户配置文件失败: {e}")
# 保存默认配置
self._save_config(default_config)
else:
# 保存默认配置
self._save_config(default_config)
return default_config
def _save_config(self, config: Dict):
"""保存配置
Args:
config: 配置信息
"""
try:
config_file = os.path.join(self.config_dir, "data_quality_config.json")
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=2)
logger.info(f"配置文件已保存: {config_file}")
except Exception as e:
logger.error(f"保存配置文件失败: {e}")
def check_data_completeness(self, data_type: str = "daily") -> Dict:
"""检查数据完整性
Args:
data_type: 数据类型(daily, financial, info
Returns:
Dict: 完整性检查结果
"""
logger.info(f"开始检查 {data_type} 数据完整性")
result = {
"check_time": self.current_time.isoformat(),
"data_type": data_type,
"status": "unknown",
"metrics": {},
"issues": [],
"recommendations": []
}
try:
# 根据数据类型确定检查路径
if data_type == "daily":
data_path = os.path.join(self.base_dir, "processed", "a_stock_daily")
check_function = self._check_daily_completeness
elif data_type == "financial":
data_path = os.path.join(self.base_dir, "processed", "financial_indicators")
check_function = self._check_financial_completeness
elif data_type == "info":
data_path = os.path.join(self.base_dir, "processed", "stock_info")
check_function = self._check_info_completeness
else:
result["error"] = f"不支持的数据类型: {data_type}"
return result
# 检查目录是否存在
if not os.path.exists(data_path):
result["status"] = "failed"
result["issues"].append(f"数据目录不存在: {data_path}")
result["recommendations"].append(f"创建数据目录: {data_path}")
return result
# 执行具体的完整性检查
completeness_result = check_function(data_path)
# 更新结果
result.update(completeness_result)
# 评估状态
if result.get("status") == "unknown":
completeness_score = result.get("metrics", {}).get("completeness_score", 0)
if completeness_score >= self.config["quality_checks"]["completeness"]["min_date_coverage"]:
result["status"] = "good"
elif completeness_score >= self.config["quality_checks"]["completeness"]["critical_threshold"]:
result["status"] = "warning"
else:
result["status"] = "critical"
logger.info(f"{data_type} 数据完整性检查完成: {result['status']}")
except Exception as e:
result["status"] = "error"
result["error"] = str(e)
logger.error(f"完整性检查失败: {e}")
return result
def _check_daily_completeness(self, data_path: str) -> Dict:
"""检查日线数据完整性
Args:
data_path: 日线数据路径
Returns:
Dict: 完整性检查结果
"""
result = {
"metrics": {
"total_files": 0,
"total_records": 0,
"date_range": {},
"missing_data": {},
"completeness_score": 0
},
"issues": [],
"recommendations": []
}
try:
# 查找所有Parquet文件
parquet_files = []
for root, dirs, files in os.walk(data_path):
for file in files:
if file.endswith('.parquet'):
parquet_files.append(os.path.join(root, file))
result["metrics"]["total_files"] = len(parquet_files)
if not parquet_files:
result["issues"].append("无日线数据文件")
return result
# 加载股票基础信息,获取需要检查的股票列表
info_path = os.path.join(self.base_dir, "processed", "stock_info")
stock_list = self._get_stock_list(info_path)
if not stock_list:
result["issues"].append("无法获取股票列表")
return result
# 检查每只股票的数据
missing_issues = []
for stock_symbol in stock_list[:100]: # 先检查前100只
# 构建预期的文件名模式
# 实际应用中需要更复杂的检查逻辑
pass
# 临时结果
result["metrics"]["completeness_score"] = 0.85
result["issues"].append("完整性检查功能待完善")
result["recommendations"].append("实现详细的数据完整性检查逻辑")
except Exception as e:
result["issues"].append(f"检查过程中出错: {e}")
return result
def _check_financial_completeness(self, data_path: str) -> Dict:
"""检查财务数据完整性
Args:
data_path: 财务数据路径
Returns:
Dict: 完整性检查结果
"""
result = {
"metrics": {
"total_files": 0,
"total_records": 0,
"report_periods": {},
"missing_data": {},
"completeness_score": 0
},
"issues": [],
"recommendations": []
}
# 临时结果
result["metrics"]["completeness_score"] = 0.80
result["issues"].append("财务数据完整性检查待实现")
result["recommendations"].append("实现财务数据的季度/年度完整性检查")
return result
def _check_info_completeness(self, data_path: str) -> Dict:
"""检查基础信息完整性
Args:
data_path: 基础信息路径
Returns:
Dict: 完整性检查结果
"""
result = {
"metrics": {
"total_files": 0,
"total_records": 0,
"field_coverage": {},
"missing_fields": [],
"completeness_score": 0
},
"issues": [],
"recommendations": []
}
# 临时结果
result["metrics"]["completeness_score"] = 0.90
result["issues"].append("基础信息完整性检查待优化")
result["recommendations"].append("实现完整的股票基础信息字段检查")
return result
def _get_stock_list(self, info_path: str) -> List[str]:
"""获取股票代码列表
Args:
info_path: 基础信息路径
Returns:
List[str]: 股票代码列表
"""
stock_list = []
try:
if os.path.exists(info_path):
# 查找最新的处理文件
parquet_files = [f for f in os.listdir(info_path) if f.endswith('.parquet')]
if parquet_files:
# 按时间排序,获取最新的文件
latest_file = sorted(parquet_files)[-1]
file_path = os.path.join(info_path, latest_file)
df = pd.read_parquet(file_path)
if 'symbol' in df.columns:
stock_list = df['symbol'].dropna().unique().tolist()
logger.info(f"从文件加载到 {len(stock_list)} 只股票代码")
except Exception as e:
logger.warning(f"获取股票列表失败: {e}")
return stock_list
def check_data_accuracy(self, data_type: str = "daily") -> Dict:
"""检查数据准确性
Args:
data_type: 数据类型
Returns:
Dict: 准确性检查结果
"""
logger.info(f"开始检查 {data_type} 数据准确性")
result = {
"check_time": self.current_time.isoformat(),
"data_type": data_type,
"status": "unknown",
"metrics": {},
"issues": [],
"recommendations": []
}
try:
# 根据数据类型执行检查
if data_type == "daily":
accuracy_result = self._check_daily_accuracy()
elif data_type == "financial":
accuracy_result = self._check_financial_accuracy()
else:
accuracy_result = {
"status": "unknown",
"issues": [f"准确性检查功能未实现: {data_type}"]
}
# 合并结果
result.update(accuracy_result)
# 评估状态
if result.get("status") == "unknown":
issues_count = len(result.get("issues", []))
if issues_count == 0:
result["status"] = "good"
elif issues_count < 5:
result["status"] = "warning"
else:
result["status"] = "critical"
logger.info(f"{data_type} 数据准确性检查完成: {result['status']}")
except Exception as e:
result["status"] = "error"
result["error"] = str(e)
logger.error(f"准确性检查失败: {e}")
return result
def _check_daily_accuracy(self) -> Dict:
"""检查日线数据准确性
Returns:
Dict: 准确性检查结果
"""
result = {
"metrics": {
"price_logic_errors": 0,
"volume_consistency_errors": 0,
"accuracy_score": 0
},
"issues": [],
"recommendations": []
}
# 临时结果
result["metrics"]["accuracy_score"] = 0.88
result["issues"].append("价格逻辑检查待实现")
result["recommendations"].append("实现开盘价<=最高价>=收盘价>=最低价的检查逻辑")
return result
def _check_financial_accuracy(self) -> Dict:
"""检查财务数据准确性
Returns:
Dict: 准确性检查结果
"""
result = {
"metrics": {
"calculation_errors": 0,
"consistency_errors": 0,
"accuracy_score": 0
},
"issues": [],
"recommendations": []
}
# 临时结果
result["metrics"]["accuracy_score"] = 0.82
result["issues"].append("财务指标计算验证待实现")
result["recommendations"].append("实现ROE、毛利率等财务指标的计算验证")
return result
def run_quality_checks(self) -> Dict:
"""运行所有质量检查
Returns:
Dict: 综合质量检查结果
"""
logger.info("开始运行综合数据质量检查")
results = {
"overall_status": "unknown",
"checks": [],
"summary": {
"total_checks": 0,
"passed_checks": 0,
"failed_checks": 0,
"critical_issues": 0,
"warnings": 0
},
"timestamp": self.current_time.isoformat()
}
try:
# 检查基础信息数据
info_result = self.check_data_completeness("info")
results["checks"].append(info_result)
# 检查日线数据
daily_result = self.check_data_completeness("daily")
results["checks"].append(daily_result)
# 检查财务数据
financial_result = self.check_data_completeness("financial")
results["checks"].append(financial_result)
# 检查数据准确性
daily_accuracy = self.check_data_accuracy("daily")
results["checks"].append(daily_accuracy)
financial_accuracy = self.check_data_accuracy("financial")
results["checks"].append(financial_accuracy)
# 计算总体状态
statuses = [check.get("status", "unknown") for check in results["checks"]]
if "critical" in statuses or "error" in statuses:
results["overall_status"] = "critical"
elif "warning" in statuses:
results["overall_status"] = "warning"
elif all(status == "good" for status in statuses):
results["overall_status"] = "good"
else:
results["overall_status"] = "warning"
# 生成摘要
results["summary"]["total_checks"] = len(results["checks"])
results["summary"]["passed_checks"] = sum(1 for check in results["checks"] if check.get("status") == "good")
results["summary"]["failed_checks"] = sum(1 for check in results["checks"] if check.get("status") in ["critical", "error"])
results["summary"]["critical_issues"] = sum(1 for check in results["checks"] if check.get("status") == "critical")
results["summary"]["warnings"] = sum(1 for check in results["checks"] if check.get("status") == "warning")
# 保存质量报告
self._save_quality_report(results)
logger.info(f"综合质量检查完成: {results['overall_status']}")
except Exception as e:
results["overall_status"] = "error"
results["error"] = str(e)
logger.error(f"综合质量检查失败: {e}")
return results
def _save_quality_report(self, results: Dict):
"""保存质量检查报告
Args:
results: 质量检查结果
"""
try:
timestamp = self.current_time.strftime('%Y%m%d_%H%M%S')
# 保存详细报告
report_file = os.path.join(self.quality_dir, f"quality_report_{timestamp}.json")
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
logger.info(f"质量检查报告已保存: {report_file}")
# 保存摘要报告
summary = {
"timestamp": results["timestamp"],
"overall_status": results["overall_status"],
"summary": results["summary"],
"critical_issues": [],
"recommendations": []
}
# 提取关键问题和建议
for check in results["checks"]:
if check.get("status") in ["critical", "error"]:
summary["critical_issues"].extend(check.get("issues", []))
summary["recommendations"].extend(check.get("recommendations", []))
summary_file = os.path.join(self.quality_dir, f"quality_summary_{timestamp}.json")
with open(summary_file, 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
logger.info(f"质量检查摘要已保存: {summary_file}")
except Exception as e:
logger.error(f"保存质量检查报告失败: {e}")
def run_update_check(self) -> Dict:
"""运行数据更新检查
Returns:
Dict: 更新检查结果
"""
logger.info("开始检查数据更新情况")
result = {
"check_time": self.current_time.isoformat(),
"update_status": "unknown",
"last_update": {},
"update_needed": False,
"update_tasks": []
}
try:
# 检查更新日志目录
if os.path.exists(self.update_logs_dir):
# 查找最新的更新日志
log_files = [f for f in os.listdir(self.update_logs_dir) if f.endswith('.json')]
if log_files:
# 按时间排序,获取最新的日志
latest_file = sorted(log_files)[-1]
log_path = os.path.join(self.update_logs_dir, latest_file)
try:
with open(log_path, 'r', encoding='utf-8') as f:
last_log = json.load(f)
result["last_update"] = last_log
# 判断是否需要更新
last_update_time = last_log.get("timestamp")
if last_update_time:
last_dt = datetime.fromisoformat(last_update_time)
time_diff = self.current_time - last_dt
# 根据配置判断更新时间间隔
if time_diff.days >= 1:
result["update_needed"] = True
result["update_status"] = "needs_update"
result["update_tasks"].append("daily")
if time_diff.days >= 7:
result["update_tasks"].extend(["financial", "info"])
if time_diff.days >= 30:
result["update_tasks"].append("all")
except Exception as e:
logger.warning(f"读取更新日志失败: {e}")
# 如果没有更新日志,说明需要初次更新
if not result["last_update"]:
result["update_needed"] = True
result["update_status"] = "initial_update_needed"
result["update_tasks"] = ["all"]
logger.info(f"数据更新检查完成: {result['update_status']}")
except Exception as e:
result["update_status"] = "error"
result["error"] = str(e)
logger.error(f"更新检查失败: {e}")
return result
def main():
"""主函数"""
print("=" * 70)
print("📊 A股数据质量管理")
print("=" * 70)
# 创建质量管理器
quality_manager = AStockDataQualityManager()
# 运行综合质量检查
print("开始运行综合数据质量检查...")
quality_results = quality_manager.run_quality_checks()
# 运行更新检查
print("开始检查数据更新情况...")
update_results = quality_manager.run_update_check()
# 输出结果
print("\n" + "=" * 70)
print("📋 质量检查结果")
print("=" * 70)
if quality_results["overall_status"] != "error":
print(f"🔍 总体状态: {quality_results['overall_status'].upper()}")
print(f"📈 检查统计:")
print(f" 总检查项: {quality_results['summary']['total_checks']}")
print(f" 通过项: {quality_results['summary']['passed_checks']}")
print(f" 失败项: {quality_results['summary']['failed_checks']}")
print(f" 关键问题: {quality_results['summary']['critical_issues']}")
print(f" 警告项: {quality_results['summary']['warnings']}")
# 输出更新检查结果
print(f"\n🔄 更新检查结果:")
print(f" 更新状态: {update_results['update_status']}")
print(f" 需要更新: {'' if update_results['update_needed'] else ''}")
if update_results["update_tasks"]:
print(f" 更新任务: {', '.join(update_results['update_tasks'])}")
# 输出关键问题和建议
print(f"\n⚠️ 关键问题:")
for check in quality_results["checks"]:
if check.get("status") in ["critical", "error"] and check.get("issues"):
for issue in check.get("issues", []):
print(f"{issue}")
print(f"\n💡 改进建议:")
for check in quality_results["checks"]:
if check.get("recommendations"):
for rec in check.get("recommendations", []):
print(f"{rec}")
# 输出保存的报告文件
timestamp = quality_manager.current_time.strftime('%Y%m%d_%H%M%S')
report_file = os.path.join(quality_manager.quality_dir, f"quality_report_{timestamp}.json")
summary_file = os.path.join(quality_manager.quality_dir, f"quality_summary_{timestamp}.json")
print(f"\n📄 报告文件:")
print(f" 详细报告: {report_file}")
print(f" 摘要报告: {summary_file}")
# 根据检查结果给出行动建议
print(f"\n🎯 建议行动:")
if quality_results["overall_status"] == "critical":
print(f" • ⚠️ 立即修复关键数据问题")
print(f" • 🔧 检查数据采集流程")
print(f" • 📊 重新采集问题数据")
elif quality_results["overl_status"] == "warning":
print(f" • 🔍 监控警告问题")
print(f" • ⚡ 按计划优化数据质量")
print(f" • 📈 定期运行质量检查")
elif quality_results["overall_status"] == "good":
print(f" • ✅ 数据质量良好,继续维护")
print(f" • 🔄 按更新计划执行")
print(f" • 🛡️ 保持监控和预防")
if update_results["update_needed"]:
print(f"\n🔄 需要执行更新:")
for task in update_results["update_tasks"]:
print(f"{task}数据更新")
else:
print(f"❌ 质量检查失败")
print(f"错误信息: {quality_results.get('error', '未知错误')}")
print(f"请检查数据目录结构和配置文件")
print("=" * 70)
if __name__ == "__main__":
main()