diff --git a/zhaoyun-data/scripts/data_quality/data_quality_manager.py b/zhaoyun-data/scripts/data_quality/data_quality_manager.py new file mode 100644 index 000000000..e86ad84a6 --- /dev/null +++ b/zhaoyun-data/scripts/data_quality/data_quality_manager.py @@ -0,0 +1,729 @@ +#!/usr/bin/env python3 +""" +A股数据质量管理脚本 +负责数据完整性、准确性检查 +支持每日/每周增量更新机制 +""" +import sys +import os +import time +import json +import pandas as pd +import numpy as np +from datetime import datetime, timedelta +from typing import List, Dict, Optional, Tuple +import logging +import warnings + +warnings.filterwarnings('ignore') + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class AStockDataQualityManager: + """A股数据质量管理器""" + + def __init__(self): + """初始化质量管理器""" + logger.info("A股数据质量管理器初始化") + + # 基础路径 + self.base_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data" + + # 运行数据路径 + self.running_dir = os.path.join(self.base_dir, "running_data") + self.quality_dir = os.path.join(self.running_dir, "quality_monitor") + self.update_logs_dir = os.path.join(self.running_dir, "update_logs") + self.config_dir = os.path.join(self.running_dir, "config") + + # 确保目录存在 + os.makedirs(self.quality_dir, exist_ok=True) + os.makedirs(self.update_logs_dir, exist_ok=True) + os.makedirs(self.config_dir, exist_ok=True) + + # 加载配置 + self.config = self._load_config() + + # 当前时间 + self.current_time = datetime.now() + + logger.info("质量管理器初始化完成") + + def _load_config(self) -> Dict: + """加载配置 + + Returns: + Dict: 配置信息 + """ + config_file = os.path.join(self.config_dir, "data_quality_config.json") + + default_config = { + "quality_checks": { + "completeness": { + "enabled": True, + "check_missing_dates": True, + "min_date_coverage": 0.95, + "critical_threshold": 0.90 + }, + "accuracy": { + "enabled": True, + "check_price_logic": True, + "check_volume_consistency": True, + "check_financial_calc": True + }, + "consistency": { + "enabled": True, + "check_field_formats": True, + "check_data_types": True, + "check_value_ranges": True + } + }, + "update_schedule": { + "daily_update": { + "enabled": True, + "time": "18:00", + "data_types": ["daily"] + }, + "weekly_update": { + "enabled": True, + "day": "Sunday", + "time": "20:00", + "data_types": ["financial", "info"] + }, + "monthly_update": { + "enabled": True, + "day": "01", + "time": "22:00", + "data_types": ["all"] + } + }, + "monitoring": { + "alert_enabled": True, + "email_alerts": False, + "log_retention_days": 30, + "report_frequency": "daily" + } + } + + if os.path.exists(config_file): + try: + with open(config_file, 'r', encoding='utf-8') as f: + user_config = json.load(f) + default_config.update(user_config) + logger.info(f"从 {config_file} 加载用户配置") + except Exception as e: + logger.warning(f"加载用户配置文件失败: {e}") + # 保存默认配置 + self._save_config(default_config) + else: + # 保存默认配置 + self._save_config(default_config) + + return default_config + + def _save_config(self, config: Dict): + """保存配置 + + Args: + config: 配置信息 + """ + try: + config_file = os.path.join(self.config_dir, "data_quality_config.json") + with open(config_file, 'w', encoding='utf-8') as f: + json.dump(config, f, ensure_ascii=False, indent=2) + logger.info(f"配置文件已保存: {config_file}") + except Exception as e: + logger.error(f"保存配置文件失败: {e}") + + def check_data_completeness(self, data_type: str = "daily") -> Dict: + """检查数据完整性 + + Args: + data_type: 数据类型(daily, financial, info) + + Returns: + Dict: 完整性检查结果 + """ + logger.info(f"开始检查 {data_type} 数据完整性") + + result = { + "check_time": self.current_time.isoformat(), + "data_type": data_type, + "status": "unknown", + "metrics": {}, + "issues": [], + "recommendations": [] + } + + try: + # 根据数据类型确定检查路径 + if data_type == "daily": + data_path = os.path.join(self.base_dir, "processed", "a_stock_daily") + check_function = self._check_daily_completeness + elif data_type == "financial": + data_path = os.path.join(self.base_dir, "processed", "financial_indicators") + check_function = self._check_financial_completeness + elif data_type == "info": + data_path = os.path.join(self.base_dir, "processed", "stock_info") + check_function = self._check_info_completeness + else: + result["error"] = f"不支持的数据类型: {data_type}" + return result + + # 检查目录是否存在 + if not os.path.exists(data_path): + result["status"] = "failed" + result["issues"].append(f"数据目录不存在: {data_path}") + result["recommendations"].append(f"创建数据目录: {data_path}") + return result + + # 执行具体的完整性检查 + completeness_result = check_function(data_path) + + # 更新结果 + result.update(completeness_result) + + # 评估状态 + if result.get("status") == "unknown": + completeness_score = result.get("metrics", {}).get("completeness_score", 0) + if completeness_score >= self.config["quality_checks"]["completeness"]["min_date_coverage"]: + result["status"] = "good" + elif completeness_score >= self.config["quality_checks"]["completeness"]["critical_threshold"]: + result["status"] = "warning" + else: + result["status"] = "critical" + + logger.info(f"{data_type} 数据完整性检查完成: {result['status']}") + + except Exception as e: + result["status"] = "error" + result["error"] = str(e) + logger.error(f"完整性检查失败: {e}") + + return result + + def _check_daily_completeness(self, data_path: str) -> Dict: + """检查日线数据完整性 + + Args: + data_path: 日线数据路径 + + Returns: + Dict: 完整性检查结果 + """ + result = { + "metrics": { + "total_files": 0, + "total_records": 0, + "date_range": {}, + "missing_data": {}, + "completeness_score": 0 + }, + "issues": [], + "recommendations": [] + } + + try: + # 查找所有Parquet文件 + parquet_files = [] + for root, dirs, files in os.walk(data_path): + for file in files: + if file.endswith('.parquet'): + parquet_files.append(os.path.join(root, file)) + + result["metrics"]["total_files"] = len(parquet_files) + + if not parquet_files: + result["issues"].append("无日线数据文件") + return result + + # 加载股票基础信息,获取需要检查的股票列表 + info_path = os.path.join(self.base_dir, "processed", "stock_info") + stock_list = self._get_stock_list(info_path) + + if not stock_list: + result["issues"].append("无法获取股票列表") + return result + + # 检查每只股票的数据 + missing_issues = [] + for stock_symbol in stock_list[:100]: # 先检查前100只 + # 构建预期的文件名模式 + # 实际应用中需要更复杂的检查逻辑 + pass + + # 临时结果 + result["metrics"]["completeness_score"] = 0.85 + result["issues"].append("完整性检查功能待完善") + result["recommendations"].append("实现详细的数据完整性检查逻辑") + + except Exception as e: + result["issues"].append(f"检查过程中出错: {e}") + + return result + + def _check_financial_completeness(self, data_path: str) -> Dict: + """检查财务数据完整性 + + Args: + data_path: 财务数据路径 + + Returns: + Dict: 完整性检查结果 + """ + result = { + "metrics": { + "total_files": 0, + "total_records": 0, + "report_periods": {}, + "missing_data": {}, + "completeness_score": 0 + }, + "issues": [], + "recommendations": [] + } + + # 临时结果 + result["metrics"]["completeness_score"] = 0.80 + result["issues"].append("财务数据完整性检查待实现") + result["recommendations"].append("实现财务数据的季度/年度完整性检查") + + return result + + def _check_info_completeness(self, data_path: str) -> Dict: + """检查基础信息完整性 + + Args: + data_path: 基础信息路径 + + Returns: + Dict: 完整性检查结果 + """ + result = { + "metrics": { + "total_files": 0, + "total_records": 0, + "field_coverage": {}, + "missing_fields": [], + "completeness_score": 0 + }, + "issues": [], + "recommendations": [] + } + + # 临时结果 + result["metrics"]["completeness_score"] = 0.90 + result["issues"].append("基础信息完整性检查待优化") + result["recommendations"].append("实现完整的股票基础信息字段检查") + + return result + + def _get_stock_list(self, info_path: str) -> List[str]: + """获取股票代码列表 + + Args: + info_path: 基础信息路径 + + Returns: + List[str]: 股票代码列表 + """ + stock_list = [] + + try: + if os.path.exists(info_path): + # 查找最新的处理文件 + parquet_files = [f for f in os.listdir(info_path) if f.endswith('.parquet')] + + if parquet_files: + # 按时间排序,获取最新的文件 + latest_file = sorted(parquet_files)[-1] + file_path = os.path.join(info_path, latest_file) + + df = pd.read_parquet(file_path) + if 'symbol' in df.columns: + stock_list = df['symbol'].dropna().unique().tolist() + logger.info(f"从文件加载到 {len(stock_list)} 只股票代码") + + except Exception as e: + logger.warning(f"获取股票列表失败: {e}") + + return stock_list + + def check_data_accuracy(self, data_type: str = "daily") -> Dict: + """检查数据准确性 + + Args: + data_type: 数据类型 + + Returns: + Dict: 准确性检查结果 + """ + logger.info(f"开始检查 {data_type} 数据准确性") + + result = { + "check_time": self.current_time.isoformat(), + "data_type": data_type, + "status": "unknown", + "metrics": {}, + "issues": [], + "recommendations": [] + } + + try: + # 根据数据类型执行检查 + if data_type == "daily": + accuracy_result = self._check_daily_accuracy() + elif data_type == "financial": + accuracy_result = self._check_financial_accuracy() + else: + accuracy_result = { + "status": "unknown", + "issues": [f"准确性检查功能未实现: {data_type}"] + } + + # 合并结果 + result.update(accuracy_result) + + # 评估状态 + if result.get("status") == "unknown": + issues_count = len(result.get("issues", [])) + if issues_count == 0: + result["status"] = "good" + elif issues_count < 5: + result["status"] = "warning" + else: + result["status"] = "critical" + + logger.info(f"{data_type} 数据准确性检查完成: {result['status']}") + + except Exception as e: + result["status"] = "error" + result["error"] = str(e) + logger.error(f"准确性检查失败: {e}") + + return result + + def _check_daily_accuracy(self) -> Dict: + """检查日线数据准确性 + + Returns: + Dict: 准确性检查结果 + """ + result = { + "metrics": { + "price_logic_errors": 0, + "volume_consistency_errors": 0, + "accuracy_score": 0 + }, + "issues": [], + "recommendations": [] + } + + # 临时结果 + result["metrics"]["accuracy_score"] = 0.88 + result["issues"].append("价格逻辑检查待实现") + result["recommendations"].append("实现开盘价<=最高价>=收盘价>=最低价的检查逻辑") + + return result + + def _check_financial_accuracy(self) -> Dict: + """检查财务数据准确性 + + Returns: + Dict: 准确性检查结果 + """ + result = { + "metrics": { + "calculation_errors": 0, + "consistency_errors": 0, + "accuracy_score": 0 + }, + "issues": [], + "recommendations": [] + } + + # 临时结果 + result["metrics"]["accuracy_score"] = 0.82 + result["issues"].append("财务指标计算验证待实现") + result["recommendations"].append("实现ROE、毛利率等财务指标的计算验证") + + return result + + def run_quality_checks(self) -> Dict: + """运行所有质量检查 + + Returns: + Dict: 综合质量检查结果 + """ + logger.info("开始运行综合数据质量检查") + + results = { + "overall_status": "unknown", + "checks": [], + "summary": { + "total_checks": 0, + "passed_checks": 0, + "failed_checks": 0, + "critical_issues": 0, + "warnings": 0 + }, + "timestamp": self.current_time.isoformat() + } + + try: + # 检查基础信息数据 + info_result = self.check_data_completeness("info") + results["checks"].append(info_result) + + # 检查日线数据 + daily_result = self.check_data_completeness("daily") + results["checks"].append(daily_result) + + # 检查财务数据 + financial_result = self.check_data_completeness("financial") + results["checks"].append(financial_result) + + # 检查数据准确性 + daily_accuracy = self.check_data_accuracy("daily") + results["checks"].append(daily_accuracy) + + financial_accuracy = self.check_data_accuracy("financial") + results["checks"].append(financial_accuracy) + + # 计算总体状态 + statuses = [check.get("status", "unknown") for check in results["checks"]] + + if "critical" in statuses or "error" in statuses: + results["overall_status"] = "critical" + elif "warning" in statuses: + results["overall_status"] = "warning" + elif all(status == "good" for status in statuses): + results["overall_status"] = "good" + else: + results["overall_status"] = "warning" + + # 生成摘要 + results["summary"]["total_checks"] = len(results["checks"]) + results["summary"]["passed_checks"] = sum(1 for check in results["checks"] if check.get("status") == "good") + results["summary"]["failed_checks"] = sum(1 for check in results["checks"] if check.get("status") in ["critical", "error"]) + results["summary"]["critical_issues"] = sum(1 for check in results["checks"] if check.get("status") == "critical") + results["summary"]["warnings"] = sum(1 for check in results["checks"] if check.get("status") == "warning") + + # 保存质量报告 + self._save_quality_report(results) + + logger.info(f"综合质量检查完成: {results['overall_status']}") + + except Exception as e: + results["overall_status"] = "error" + results["error"] = str(e) + logger.error(f"综合质量检查失败: {e}") + + return results + + def _save_quality_report(self, results: Dict): + """保存质量检查报告 + + Args: + results: 质量检查结果 + """ + try: + timestamp = self.current_time.strftime('%Y%m%d_%H%M%S') + + # 保存详细报告 + report_file = os.path.join(self.quality_dir, f"quality_report_{timestamp}.json") + with open(report_file, 'w', encoding='utf-8') as f: + json.dump(results, f, ensure_ascii=False, indent=2) + + logger.info(f"质量检查报告已保存: {report_file}") + + # 保存摘要报告 + summary = { + "timestamp": results["timestamp"], + "overall_status": results["overall_status"], + "summary": results["summary"], + "critical_issues": [], + "recommendations": [] + } + + # 提取关键问题和建议 + for check in results["checks"]: + if check.get("status") in ["critical", "error"]: + summary["critical_issues"].extend(check.get("issues", [])) + summary["recommendations"].extend(check.get("recommendations", [])) + + summary_file = os.path.join(self.quality_dir, f"quality_summary_{timestamp}.json") + with open(summary_file, 'w', encoding='utf-8') as f: + json.dump(summary, f, ensure_ascii=False, indent=2) + + logger.info(f"质量检查摘要已保存: {summary_file}") + + except Exception as e: + logger.error(f"保存质量检查报告失败: {e}") + + def run_update_check(self) -> Dict: + """运行数据更新检查 + + Returns: + Dict: 更新检查结果 + """ + logger.info("开始检查数据更新情况") + + result = { + "check_time": self.current_time.isoformat(), + "update_status": "unknown", + "last_update": {}, + "update_needed": False, + "update_tasks": [] + } + + try: + # 检查更新日志目录 + if os.path.exists(self.update_logs_dir): + # 查找最新的更新日志 + log_files = [f for f in os.listdir(self.update_logs_dir) if f.endswith('.json')] + + if log_files: + # 按时间排序,获取最新的日志 + latest_file = sorted(log_files)[-1] + log_path = os.path.join(self.update_logs_dir, latest_file) + + try: + with open(log_path, 'r', encoding='utf-8') as f: + last_log = json.load(f) + + result["last_update"] = last_log + + # 判断是否需要更新 + last_update_time = last_log.get("timestamp") + if last_update_time: + last_dt = datetime.fromisoformat(last_update_time) + time_diff = self.current_time - last_dt + + # 根据配置判断更新时间间隔 + if time_diff.days >= 1: + result["update_needed"] = True + result["update_status"] = "needs_update" + result["update_tasks"].append("daily") + + if time_diff.days >= 7: + result["update_tasks"].extend(["financial", "info"]) + + if time_diff.days >= 30: + result["update_tasks"].append("all") + + except Exception as e: + logger.warning(f"读取更新日志失败: {e}") + + # 如果没有更新日志,说明需要初次更新 + if not result["last_update"]: + result["update_needed"] = True + result["update_status"] = "initial_update_needed" + result["update_tasks"] = ["all"] + + logger.info(f"数据更新检查完成: {result['update_status']}") + + except Exception as e: + result["update_status"] = "error" + result["error"] = str(e) + logger.error(f"更新检查失败: {e}") + + return result + + +def main(): + """主函数""" + print("=" * 70) + print("📊 A股数据质量管理") + print("=" * 70) + + # 创建质量管理器 + quality_manager = AStockDataQualityManager() + + # 运行综合质量检查 + print("开始运行综合数据质量检查...") + quality_results = quality_manager.run_quality_checks() + + # 运行更新检查 + print("开始检查数据更新情况...") + update_results = quality_manager.run_update_check() + + # 输出结果 + print("\n" + "=" * 70) + print("📋 质量检查结果") + print("=" * 70) + + if quality_results["overall_status"] != "error": + print(f"🔍 总体状态: {quality_results['overall_status'].upper()}") + print(f"📈 检查统计:") + print(f" 总检查项: {quality_results['summary']['total_checks']}") + print(f" 通过项: {quality_results['summary']['passed_checks']}") + print(f" 失败项: {quality_results['summary']['failed_checks']}") + print(f" 关键问题: {quality_results['summary']['critical_issues']}") + print(f" 警告项: {quality_results['summary']['warnings']}") + + # 输出更新检查结果 + print(f"\n🔄 更新检查结果:") + print(f" 更新状态: {update_results['update_status']}") + print(f" 需要更新: {'是' if update_results['update_needed'] else '否'}") + + if update_results["update_tasks"]: + print(f" 更新任务: {', '.join(update_results['update_tasks'])}") + + # 输出关键问题和建议 + print(f"\n⚠️ 关键问题:") + for check in quality_results["checks"]: + if check.get("status") in ["critical", "error"] and check.get("issues"): + for issue in check.get("issues", []): + print(f" • {issue}") + + print(f"\n💡 改进建议:") + for check in quality_results["checks"]: + if check.get("recommendations"): + for rec in check.get("recommendations", []): + print(f" • {rec}") + + # 输出保存的报告文件 + timestamp = quality_manager.current_time.strftime('%Y%m%d_%H%M%S') + report_file = os.path.join(quality_manager.quality_dir, f"quality_report_{timestamp}.json") + summary_file = os.path.join(quality_manager.quality_dir, f"quality_summary_{timestamp}.json") + + print(f"\n📄 报告文件:") + print(f" 详细报告: {report_file}") + print(f" 摘要报告: {summary_file}") + + # 根据检查结果给出行动建议 + print(f"\n🎯 建议行动:") + if quality_results["overall_status"] == "critical": + print(f" • ⚠️ 立即修复关键数据问题") + print(f" • 🔧 检查数据采集流程") + print(f" • 📊 重新采集问题数据") + elif quality_results["overl_status"] == "warning": + print(f" • 🔍 监控警告问题") + print(f" • ⚡ 按计划优化数据质量") + print(f" • 📈 定期运行质量检查") + elif quality_results["overall_status"] == "good": + print(f" • ✅ 数据质量良好,继续维护") + print(f" • 🔄 按更新计划执行") + print(f" • 🛡️ 保持监控和预防") + + if update_results["update_needed"]: + print(f"\n🔄 需要执行更新:") + for task in update_results["update_tasks"]: + print(f" • {task}数据更新") + + else: + print(f"❌ 质量检查失败") + print(f"错误信息: {quality_results.get('error', '未知错误')}") + print(f"请检查数据目录结构和配置文件") + + print("=" * 70) + + +if __name__ == "__main__": + main() \ No newline at end of file