auto-sync: 2026-03-26 11:43:55
This commit is contained in:
@@ -0,0 +1,729 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
A股数据质量管理脚本
|
||||
负责数据完整性、准确性检查
|
||||
支持每日/每周增量更新机制
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import json
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from datetime import datetime, timedelta
|
||||
from typing import List, Dict, Optional, Tuple
|
||||
import logging
|
||||
import warnings
|
||||
|
||||
warnings.filterwarnings('ignore')
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AStockDataQualityManager:
|
||||
"""A股数据质量管理器"""
|
||||
|
||||
def __init__(self):
|
||||
"""初始化质量管理器"""
|
||||
logger.info("A股数据质量管理器初始化")
|
||||
|
||||
# 基础路径
|
||||
self.base_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data/data"
|
||||
|
||||
# 运行数据路径
|
||||
self.running_dir = os.path.join(self.base_dir, "running_data")
|
||||
self.quality_dir = os.path.join(self.running_dir, "quality_monitor")
|
||||
self.update_logs_dir = os.path.join(self.running_dir, "update_logs")
|
||||
self.config_dir = os.path.join(self.running_dir, "config")
|
||||
|
||||
# 确保目录存在
|
||||
os.makedirs(self.quality_dir, exist_ok=True)
|
||||
os.makedirs(self.update_logs_dir, exist_ok=True)
|
||||
os.makedirs(self.config_dir, exist_ok=True)
|
||||
|
||||
# 加载配置
|
||||
self.config = self._load_config()
|
||||
|
||||
# 当前时间
|
||||
self.current_time = datetime.now()
|
||||
|
||||
logger.info("质量管理器初始化完成")
|
||||
|
||||
def _load_config(self) -> Dict:
|
||||
"""加载配置
|
||||
|
||||
Returns:
|
||||
Dict: 配置信息
|
||||
"""
|
||||
config_file = os.path.join(self.config_dir, "data_quality_config.json")
|
||||
|
||||
default_config = {
|
||||
"quality_checks": {
|
||||
"completeness": {
|
||||
"enabled": True,
|
||||
"check_missing_dates": True,
|
||||
"min_date_coverage": 0.95,
|
||||
"critical_threshold": 0.90
|
||||
},
|
||||
"accuracy": {
|
||||
"enabled": True,
|
||||
"check_price_logic": True,
|
||||
"check_volume_consistency": True,
|
||||
"check_financial_calc": True
|
||||
},
|
||||
"consistency": {
|
||||
"enabled": True,
|
||||
"check_field_formats": True,
|
||||
"check_data_types": True,
|
||||
"check_value_ranges": True
|
||||
}
|
||||
},
|
||||
"update_schedule": {
|
||||
"daily_update": {
|
||||
"enabled": True,
|
||||
"time": "18:00",
|
||||
"data_types": ["daily"]
|
||||
},
|
||||
"weekly_update": {
|
||||
"enabled": True,
|
||||
"day": "Sunday",
|
||||
"time": "20:00",
|
||||
"data_types": ["financial", "info"]
|
||||
},
|
||||
"monthly_update": {
|
||||
"enabled": True,
|
||||
"day": "01",
|
||||
"time": "22:00",
|
||||
"data_types": ["all"]
|
||||
}
|
||||
},
|
||||
"monitoring": {
|
||||
"alert_enabled": True,
|
||||
"email_alerts": False,
|
||||
"log_retention_days": 30,
|
||||
"report_frequency": "daily"
|
||||
}
|
||||
}
|
||||
|
||||
if os.path.exists(config_file):
|
||||
try:
|
||||
with open(config_file, 'r', encoding='utf-8') as f:
|
||||
user_config = json.load(f)
|
||||
default_config.update(user_config)
|
||||
logger.info(f"从 {config_file} 加载用户配置")
|
||||
except Exception as e:
|
||||
logger.warning(f"加载用户配置文件失败: {e}")
|
||||
# 保存默认配置
|
||||
self._save_config(default_config)
|
||||
else:
|
||||
# 保存默认配置
|
||||
self._save_config(default_config)
|
||||
|
||||
return default_config
|
||||
|
||||
def _save_config(self, config: Dict):
|
||||
"""保存配置
|
||||
|
||||
Args:
|
||||
config: 配置信息
|
||||
"""
|
||||
try:
|
||||
config_file = os.path.join(self.config_dir, "data_quality_config.json")
|
||||
with open(config_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(config, f, ensure_ascii=False, indent=2)
|
||||
logger.info(f"配置文件已保存: {config_file}")
|
||||
except Exception as e:
|
||||
logger.error(f"保存配置文件失败: {e}")
|
||||
|
||||
def check_data_completeness(self, data_type: str = "daily") -> Dict:
|
||||
"""检查数据完整性
|
||||
|
||||
Args:
|
||||
data_type: 数据类型(daily, financial, info)
|
||||
|
||||
Returns:
|
||||
Dict: 完整性检查结果
|
||||
"""
|
||||
logger.info(f"开始检查 {data_type} 数据完整性")
|
||||
|
||||
result = {
|
||||
"check_time": self.current_time.isoformat(),
|
||||
"data_type": data_type,
|
||||
"status": "unknown",
|
||||
"metrics": {},
|
||||
"issues": [],
|
||||
"recommendations": []
|
||||
}
|
||||
|
||||
try:
|
||||
# 根据数据类型确定检查路径
|
||||
if data_type == "daily":
|
||||
data_path = os.path.join(self.base_dir, "processed", "a_stock_daily")
|
||||
check_function = self._check_daily_completeness
|
||||
elif data_type == "financial":
|
||||
data_path = os.path.join(self.base_dir, "processed", "financial_indicators")
|
||||
check_function = self._check_financial_completeness
|
||||
elif data_type == "info":
|
||||
data_path = os.path.join(self.base_dir, "processed", "stock_info")
|
||||
check_function = self._check_info_completeness
|
||||
else:
|
||||
result["error"] = f"不支持的数据类型: {data_type}"
|
||||
return result
|
||||
|
||||
# 检查目录是否存在
|
||||
if not os.path.exists(data_path):
|
||||
result["status"] = "failed"
|
||||
result["issues"].append(f"数据目录不存在: {data_path}")
|
||||
result["recommendations"].append(f"创建数据目录: {data_path}")
|
||||
return result
|
||||
|
||||
# 执行具体的完整性检查
|
||||
completeness_result = check_function(data_path)
|
||||
|
||||
# 更新结果
|
||||
result.update(completeness_result)
|
||||
|
||||
# 评估状态
|
||||
if result.get("status") == "unknown":
|
||||
completeness_score = result.get("metrics", {}).get("completeness_score", 0)
|
||||
if completeness_score >= self.config["quality_checks"]["completeness"]["min_date_coverage"]:
|
||||
result["status"] = "good"
|
||||
elif completeness_score >= self.config["quality_checks"]["completeness"]["critical_threshold"]:
|
||||
result["status"] = "warning"
|
||||
else:
|
||||
result["status"] = "critical"
|
||||
|
||||
logger.info(f"{data_type} 数据完整性检查完成: {result['status']}")
|
||||
|
||||
except Exception as e:
|
||||
result["status"] = "error"
|
||||
result["error"] = str(e)
|
||||
logger.error(f"完整性检查失败: {e}")
|
||||
|
||||
return result
|
||||
|
||||
def _check_daily_completeness(self, data_path: str) -> Dict:
|
||||
"""检查日线数据完整性
|
||||
|
||||
Args:
|
||||
data_path: 日线数据路径
|
||||
|
||||
Returns:
|
||||
Dict: 完整性检查结果
|
||||
"""
|
||||
result = {
|
||||
"metrics": {
|
||||
"total_files": 0,
|
||||
"total_records": 0,
|
||||
"date_range": {},
|
||||
"missing_data": {},
|
||||
"completeness_score": 0
|
||||
},
|
||||
"issues": [],
|
||||
"recommendations": []
|
||||
}
|
||||
|
||||
try:
|
||||
# 查找所有Parquet文件
|
||||
parquet_files = []
|
||||
for root, dirs, files in os.walk(data_path):
|
||||
for file in files:
|
||||
if file.endswith('.parquet'):
|
||||
parquet_files.append(os.path.join(root, file))
|
||||
|
||||
result["metrics"]["total_files"] = len(parquet_files)
|
||||
|
||||
if not parquet_files:
|
||||
result["issues"].append("无日线数据文件")
|
||||
return result
|
||||
|
||||
# 加载股票基础信息,获取需要检查的股票列表
|
||||
info_path = os.path.join(self.base_dir, "processed", "stock_info")
|
||||
stock_list = self._get_stock_list(info_path)
|
||||
|
||||
if not stock_list:
|
||||
result["issues"].append("无法获取股票列表")
|
||||
return result
|
||||
|
||||
# 检查每只股票的数据
|
||||
missing_issues = []
|
||||
for stock_symbol in stock_list[:100]: # 先检查前100只
|
||||
# 构建预期的文件名模式
|
||||
# 实际应用中需要更复杂的检查逻辑
|
||||
pass
|
||||
|
||||
# 临时结果
|
||||
result["metrics"]["completeness_score"] = 0.85
|
||||
result["issues"].append("完整性检查功能待完善")
|
||||
result["recommendations"].append("实现详细的数据完整性检查逻辑")
|
||||
|
||||
except Exception as e:
|
||||
result["issues"].append(f"检查过程中出错: {e}")
|
||||
|
||||
return result
|
||||
|
||||
def _check_financial_completeness(self, data_path: str) -> Dict:
|
||||
"""检查财务数据完整性
|
||||
|
||||
Args:
|
||||
data_path: 财务数据路径
|
||||
|
||||
Returns:
|
||||
Dict: 完整性检查结果
|
||||
"""
|
||||
result = {
|
||||
"metrics": {
|
||||
"total_files": 0,
|
||||
"total_records": 0,
|
||||
"report_periods": {},
|
||||
"missing_data": {},
|
||||
"completeness_score": 0
|
||||
},
|
||||
"issues": [],
|
||||
"recommendations": []
|
||||
}
|
||||
|
||||
# 临时结果
|
||||
result["metrics"]["completeness_score"] = 0.80
|
||||
result["issues"].append("财务数据完整性检查待实现")
|
||||
result["recommendations"].append("实现财务数据的季度/年度完整性检查")
|
||||
|
||||
return result
|
||||
|
||||
def _check_info_completeness(self, data_path: str) -> Dict:
|
||||
"""检查基础信息完整性
|
||||
|
||||
Args:
|
||||
data_path: 基础信息路径
|
||||
|
||||
Returns:
|
||||
Dict: 完整性检查结果
|
||||
"""
|
||||
result = {
|
||||
"metrics": {
|
||||
"total_files": 0,
|
||||
"total_records": 0,
|
||||
"field_coverage": {},
|
||||
"missing_fields": [],
|
||||
"completeness_score": 0
|
||||
},
|
||||
"issues": [],
|
||||
"recommendations": []
|
||||
}
|
||||
|
||||
# 临时结果
|
||||
result["metrics"]["completeness_score"] = 0.90
|
||||
result["issues"].append("基础信息完整性检查待优化")
|
||||
result["recommendations"].append("实现完整的股票基础信息字段检查")
|
||||
|
||||
return result
|
||||
|
||||
def _get_stock_list(self, info_path: str) -> List[str]:
|
||||
"""获取股票代码列表
|
||||
|
||||
Args:
|
||||
info_path: 基础信息路径
|
||||
|
||||
Returns:
|
||||
List[str]: 股票代码列表
|
||||
"""
|
||||
stock_list = []
|
||||
|
||||
try:
|
||||
if os.path.exists(info_path):
|
||||
# 查找最新的处理文件
|
||||
parquet_files = [f for f in os.listdir(info_path) if f.endswith('.parquet')]
|
||||
|
||||
if parquet_files:
|
||||
# 按时间排序,获取最新的文件
|
||||
latest_file = sorted(parquet_files)[-1]
|
||||
file_path = os.path.join(info_path, latest_file)
|
||||
|
||||
df = pd.read_parquet(file_path)
|
||||
if 'symbol' in df.columns:
|
||||
stock_list = df['symbol'].dropna().unique().tolist()
|
||||
logger.info(f"从文件加载到 {len(stock_list)} 只股票代码")
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"获取股票列表失败: {e}")
|
||||
|
||||
return stock_list
|
||||
|
||||
def check_data_accuracy(self, data_type: str = "daily") -> Dict:
|
||||
"""检查数据准确性
|
||||
|
||||
Args:
|
||||
data_type: 数据类型
|
||||
|
||||
Returns:
|
||||
Dict: 准确性检查结果
|
||||
"""
|
||||
logger.info(f"开始检查 {data_type} 数据准确性")
|
||||
|
||||
result = {
|
||||
"check_time": self.current_time.isoformat(),
|
||||
"data_type": data_type,
|
||||
"status": "unknown",
|
||||
"metrics": {},
|
||||
"issues": [],
|
||||
"recommendations": []
|
||||
}
|
||||
|
||||
try:
|
||||
# 根据数据类型执行检查
|
||||
if data_type == "daily":
|
||||
accuracy_result = self._check_daily_accuracy()
|
||||
elif data_type == "financial":
|
||||
accuracy_result = self._check_financial_accuracy()
|
||||
else:
|
||||
accuracy_result = {
|
||||
"status": "unknown",
|
||||
"issues": [f"准确性检查功能未实现: {data_type}"]
|
||||
}
|
||||
|
||||
# 合并结果
|
||||
result.update(accuracy_result)
|
||||
|
||||
# 评估状态
|
||||
if result.get("status") == "unknown":
|
||||
issues_count = len(result.get("issues", []))
|
||||
if issues_count == 0:
|
||||
result["status"] = "good"
|
||||
elif issues_count < 5:
|
||||
result["status"] = "warning"
|
||||
else:
|
||||
result["status"] = "critical"
|
||||
|
||||
logger.info(f"{data_type} 数据准确性检查完成: {result['status']}")
|
||||
|
||||
except Exception as e:
|
||||
result["status"] = "error"
|
||||
result["error"] = str(e)
|
||||
logger.error(f"准确性检查失败: {e}")
|
||||
|
||||
return result
|
||||
|
||||
def _check_daily_accuracy(self) -> Dict:
|
||||
"""检查日线数据准确性
|
||||
|
||||
Returns:
|
||||
Dict: 准确性检查结果
|
||||
"""
|
||||
result = {
|
||||
"metrics": {
|
||||
"price_logic_errors": 0,
|
||||
"volume_consistency_errors": 0,
|
||||
"accuracy_score": 0
|
||||
},
|
||||
"issues": [],
|
||||
"recommendations": []
|
||||
}
|
||||
|
||||
# 临时结果
|
||||
result["metrics"]["accuracy_score"] = 0.88
|
||||
result["issues"].append("价格逻辑检查待实现")
|
||||
result["recommendations"].append("实现开盘价<=最高价>=收盘价>=最低价的检查逻辑")
|
||||
|
||||
return result
|
||||
|
||||
def _check_financial_accuracy(self) -> Dict:
|
||||
"""检查财务数据准确性
|
||||
|
||||
Returns:
|
||||
Dict: 准确性检查结果
|
||||
"""
|
||||
result = {
|
||||
"metrics": {
|
||||
"calculation_errors": 0,
|
||||
"consistency_errors": 0,
|
||||
"accuracy_score": 0
|
||||
},
|
||||
"issues": [],
|
||||
"recommendations": []
|
||||
}
|
||||
|
||||
# 临时结果
|
||||
result["metrics"]["accuracy_score"] = 0.82
|
||||
result["issues"].append("财务指标计算验证待实现")
|
||||
result["recommendations"].append("实现ROE、毛利率等财务指标的计算验证")
|
||||
|
||||
return result
|
||||
|
||||
def run_quality_checks(self) -> Dict:
|
||||
"""运行所有质量检查
|
||||
|
||||
Returns:
|
||||
Dict: 综合质量检查结果
|
||||
"""
|
||||
logger.info("开始运行综合数据质量检查")
|
||||
|
||||
results = {
|
||||
"overall_status": "unknown",
|
||||
"checks": [],
|
||||
"summary": {
|
||||
"total_checks": 0,
|
||||
"passed_checks": 0,
|
||||
"failed_checks": 0,
|
||||
"critical_issues": 0,
|
||||
"warnings": 0
|
||||
},
|
||||
"timestamp": self.current_time.isoformat()
|
||||
}
|
||||
|
||||
try:
|
||||
# 检查基础信息数据
|
||||
info_result = self.check_data_completeness("info")
|
||||
results["checks"].append(info_result)
|
||||
|
||||
# 检查日线数据
|
||||
daily_result = self.check_data_completeness("daily")
|
||||
results["checks"].append(daily_result)
|
||||
|
||||
# 检查财务数据
|
||||
financial_result = self.check_data_completeness("financial")
|
||||
results["checks"].append(financial_result)
|
||||
|
||||
# 检查数据准确性
|
||||
daily_accuracy = self.check_data_accuracy("daily")
|
||||
results["checks"].append(daily_accuracy)
|
||||
|
||||
financial_accuracy = self.check_data_accuracy("financial")
|
||||
results["checks"].append(financial_accuracy)
|
||||
|
||||
# 计算总体状态
|
||||
statuses = [check.get("status", "unknown") for check in results["checks"]]
|
||||
|
||||
if "critical" in statuses or "error" in statuses:
|
||||
results["overall_status"] = "critical"
|
||||
elif "warning" in statuses:
|
||||
results["overall_status"] = "warning"
|
||||
elif all(status == "good" for status in statuses):
|
||||
results["overall_status"] = "good"
|
||||
else:
|
||||
results["overall_status"] = "warning"
|
||||
|
||||
# 生成摘要
|
||||
results["summary"]["total_checks"] = len(results["checks"])
|
||||
results["summary"]["passed_checks"] = sum(1 for check in results["checks"] if check.get("status") == "good")
|
||||
results["summary"]["failed_checks"] = sum(1 for check in results["checks"] if check.get("status") in ["critical", "error"])
|
||||
results["summary"]["critical_issues"] = sum(1 for check in results["checks"] if check.get("status") == "critical")
|
||||
results["summary"]["warnings"] = sum(1 for check in results["checks"] if check.get("status") == "warning")
|
||||
|
||||
# 保存质量报告
|
||||
self._save_quality_report(results)
|
||||
|
||||
logger.info(f"综合质量检查完成: {results['overall_status']}")
|
||||
|
||||
except Exception as e:
|
||||
results["overall_status"] = "error"
|
||||
results["error"] = str(e)
|
||||
logger.error(f"综合质量检查失败: {e}")
|
||||
|
||||
return results
|
||||
|
||||
def _save_quality_report(self, results: Dict):
|
||||
"""保存质量检查报告
|
||||
|
||||
Args:
|
||||
results: 质量检查结果
|
||||
"""
|
||||
try:
|
||||
timestamp = self.current_time.strftime('%Y%m%d_%H%M%S')
|
||||
|
||||
# 保存详细报告
|
||||
report_file = os.path.join(self.quality_dir, f"quality_report_{timestamp}.json")
|
||||
with open(report_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(results, f, ensure_ascii=False, indent=2)
|
||||
|
||||
logger.info(f"质量检查报告已保存: {report_file}")
|
||||
|
||||
# 保存摘要报告
|
||||
summary = {
|
||||
"timestamp": results["timestamp"],
|
||||
"overall_status": results["overall_status"],
|
||||
"summary": results["summary"],
|
||||
"critical_issues": [],
|
||||
"recommendations": []
|
||||
}
|
||||
|
||||
# 提取关键问题和建议
|
||||
for check in results["checks"]:
|
||||
if check.get("status") in ["critical", "error"]:
|
||||
summary["critical_issues"].extend(check.get("issues", []))
|
||||
summary["recommendations"].extend(check.get("recommendations", []))
|
||||
|
||||
summary_file = os.path.join(self.quality_dir, f"quality_summary_{timestamp}.json")
|
||||
with open(summary_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(summary, f, ensure_ascii=False, indent=2)
|
||||
|
||||
logger.info(f"质量检查摘要已保存: {summary_file}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"保存质量检查报告失败: {e}")
|
||||
|
||||
def run_update_check(self) -> Dict:
|
||||
"""运行数据更新检查
|
||||
|
||||
Returns:
|
||||
Dict: 更新检查结果
|
||||
"""
|
||||
logger.info("开始检查数据更新情况")
|
||||
|
||||
result = {
|
||||
"check_time": self.current_time.isoformat(),
|
||||
"update_status": "unknown",
|
||||
"last_update": {},
|
||||
"update_needed": False,
|
||||
"update_tasks": []
|
||||
}
|
||||
|
||||
try:
|
||||
# 检查更新日志目录
|
||||
if os.path.exists(self.update_logs_dir):
|
||||
# 查找最新的更新日志
|
||||
log_files = [f for f in os.listdir(self.update_logs_dir) if f.endswith('.json')]
|
||||
|
||||
if log_files:
|
||||
# 按时间排序,获取最新的日志
|
||||
latest_file = sorted(log_files)[-1]
|
||||
log_path = os.path.join(self.update_logs_dir, latest_file)
|
||||
|
||||
try:
|
||||
with open(log_path, 'r', encoding='utf-8') as f:
|
||||
last_log = json.load(f)
|
||||
|
||||
result["last_update"] = last_log
|
||||
|
||||
# 判断是否需要更新
|
||||
last_update_time = last_log.get("timestamp")
|
||||
if last_update_time:
|
||||
last_dt = datetime.fromisoformat(last_update_time)
|
||||
time_diff = self.current_time - last_dt
|
||||
|
||||
# 根据配置判断更新时间间隔
|
||||
if time_diff.days >= 1:
|
||||
result["update_needed"] = True
|
||||
result["update_status"] = "needs_update"
|
||||
result["update_tasks"].append("daily")
|
||||
|
||||
if time_diff.days >= 7:
|
||||
result["update_tasks"].extend(["financial", "info"])
|
||||
|
||||
if time_diff.days >= 30:
|
||||
result["update_tasks"].append("all")
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"读取更新日志失败: {e}")
|
||||
|
||||
# 如果没有更新日志,说明需要初次更新
|
||||
if not result["last_update"]:
|
||||
result["update_needed"] = True
|
||||
result["update_status"] = "initial_update_needed"
|
||||
result["update_tasks"] = ["all"]
|
||||
|
||||
logger.info(f"数据更新检查完成: {result['update_status']}")
|
||||
|
||||
except Exception as e:
|
||||
result["update_status"] = "error"
|
||||
result["error"] = str(e)
|
||||
logger.error(f"更新检查失败: {e}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
print("=" * 70)
|
||||
print("📊 A股数据质量管理")
|
||||
print("=" * 70)
|
||||
|
||||
# 创建质量管理器
|
||||
quality_manager = AStockDataQualityManager()
|
||||
|
||||
# 运行综合质量检查
|
||||
print("开始运行综合数据质量检查...")
|
||||
quality_results = quality_manager.run_quality_checks()
|
||||
|
||||
# 运行更新检查
|
||||
print("开始检查数据更新情况...")
|
||||
update_results = quality_manager.run_update_check()
|
||||
|
||||
# 输出结果
|
||||
print("\n" + "=" * 70)
|
||||
print("📋 质量检查结果")
|
||||
print("=" * 70)
|
||||
|
||||
if quality_results["overall_status"] != "error":
|
||||
print(f"🔍 总体状态: {quality_results['overall_status'].upper()}")
|
||||
print(f"📈 检查统计:")
|
||||
print(f" 总检查项: {quality_results['summary']['total_checks']}")
|
||||
print(f" 通过项: {quality_results['summary']['passed_checks']}")
|
||||
print(f" 失败项: {quality_results['summary']['failed_checks']}")
|
||||
print(f" 关键问题: {quality_results['summary']['critical_issues']}")
|
||||
print(f" 警告项: {quality_results['summary']['warnings']}")
|
||||
|
||||
# 输出更新检查结果
|
||||
print(f"\n🔄 更新检查结果:")
|
||||
print(f" 更新状态: {update_results['update_status']}")
|
||||
print(f" 需要更新: {'是' if update_results['update_needed'] else '否'}")
|
||||
|
||||
if update_results["update_tasks"]:
|
||||
print(f" 更新任务: {', '.join(update_results['update_tasks'])}")
|
||||
|
||||
# 输出关键问题和建议
|
||||
print(f"\n⚠️ 关键问题:")
|
||||
for check in quality_results["checks"]:
|
||||
if check.get("status") in ["critical", "error"] and check.get("issues"):
|
||||
for issue in check.get("issues", []):
|
||||
print(f" • {issue}")
|
||||
|
||||
print(f"\n💡 改进建议:")
|
||||
for check in quality_results["checks"]:
|
||||
if check.get("recommendations"):
|
||||
for rec in check.get("recommendations", []):
|
||||
print(f" • {rec}")
|
||||
|
||||
# 输出保存的报告文件
|
||||
timestamp = quality_manager.current_time.strftime('%Y%m%d_%H%M%S')
|
||||
report_file = os.path.join(quality_manager.quality_dir, f"quality_report_{timestamp}.json")
|
||||
summary_file = os.path.join(quality_manager.quality_dir, f"quality_summary_{timestamp}.json")
|
||||
|
||||
print(f"\n📄 报告文件:")
|
||||
print(f" 详细报告: {report_file}")
|
||||
print(f" 摘要报告: {summary_file}")
|
||||
|
||||
# 根据检查结果给出行动建议
|
||||
print(f"\n🎯 建议行动:")
|
||||
if quality_results["overall_status"] == "critical":
|
||||
print(f" • ⚠️ 立即修复关键数据问题")
|
||||
print(f" • 🔧 检查数据采集流程")
|
||||
print(f" • 📊 重新采集问题数据")
|
||||
elif quality_results["overl_status"] == "warning":
|
||||
print(f" • 🔍 监控警告问题")
|
||||
print(f" • ⚡ 按计划优化数据质量")
|
||||
print(f" • 📈 定期运行质量检查")
|
||||
elif quality_results["overall_status"] == "good":
|
||||
print(f" • ✅ 数据质量良好,继续维护")
|
||||
print(f" • 🔄 按更新计划执行")
|
||||
print(f" • 🛡️ 保持监控和预防")
|
||||
|
||||
if update_results["update_needed"]:
|
||||
print(f"\n🔄 需要执行更新:")
|
||||
for task in update_results["update_tasks"]:
|
||||
print(f" • {task}数据更新")
|
||||
|
||||
else:
|
||||
print(f"❌ 质量检查失败")
|
||||
print(f"错误信息: {quality_results.get('error', '未知错误')}")
|
||||
print(f"请检查数据目录结构和配置文件")
|
||||
|
||||
print("=" * 70)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user