diff --git a/zhaoyun-data/scripts/data_acquisition/a_stock_data_main.py b/zhaoyun-data/scripts/data_acquisition/a_stock_data_main.py new file mode 100644 index 000000000..811266e2b --- /dev/null +++ b/zhaoyun-data/scripts/data_acquisition/a_stock_data_main.py @@ -0,0 +1,627 @@ +#!/usr/bin/env python3 +""" +A股数据准备综合主控脚本 +统一管理基础信息、日线数据、财务数据采集 +""" +import sys +import os +import time +import json +import pandas as pd +from datetime import datetime +from typing import List, Dict, Optional +import logging +import warnings + +warnings.filterwarnings('ignore') + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class AStockDataMainController: + """A股数据主控制器""" + + def __init__(self): + """初始化控制器""" + logger.info("A股数据主控制器初始化") + + # 基础路径 + self.base_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data" + self.data_dir = os.path.join(self.base_dir, "data") + self.scripts_dir = os.path.join(self.base_dir, "scripts") + self.reports_dir = os.path.join(self.base_dir, "reports") + + # 确保目录存在 + os.makedirs(self.reports_dir, exist_ok=True) + + # 当前时间 + self.start_time = datetime.now() + self.task_id = self.start_time.strftime('%Y%m%d_%H%M%S') + + # 任务执行状态 + self.task_status = { + "task_id": self.task_id, + "start_time": self.start_time.isoformat(), + "end_time": None, + "overall_status": "pending", + "tasks": {}, + "summary": {} + } + + logger.info(f"任务ID: {self.task_id}") + + def run_basic_info_task(self, test_mode: bool = True) -> Dict: + """运行基础信息采集任务 + + Args: + test_mode: 测试模式(少量数据) + + Returns: + Dict: 任务执行结果 + """ + logger.info("开始基础信息采集任务") + + task_result = { + "task_name": "basic_info", + "start_time": datetime.now().isoformat(), + "status": "running", + "result": None + } + + try: + # 导入基础信息采集器 + sys.path.append(self.scripts_dir) + + try: + from data_acquisition.a_stock_basic_info import AStockBasicInfoCollector + + # 创建采集器 + collector = AStockBasicInfoCollector() + + # 运行采集 + result = collector.run() + + task_result["result"] = result + task_result["end_time"] = datetime.now().isoformat() + + if result.get("success", False): + task_result["status"] = "success" + logger.info("基础信息采集任务成功") + else: + task_result["status"] = "failed" + logger.warning("基础信息采集任务失败") + + except ImportError as e: + task_result["status"] = "failed" + task_result["error"] = f"导入采集器失败: {e}" + logger.error(f"导入采集器失败: {e}") + + except Exception as e: + task_result["status"] = "error" + task_result["error"] = str(e) + logger.error(f"基础信息采集任务异常: {e}") + + self.task_status["tasks"]["basic_info"] = task_result + + return task_result + + def run_daily_data_task(self, test_mode: bool = True) -> Dict: + """运行日线数据采集任务 + + Args: + test_mode: 测试模式(少量数据) + + Returns: + Dict: 任务执行结果 + """ + logger.info("开始日线数据采集任务") + + task_result = { + "task_name": "daily_data", + "start_time": datetime.now().isoformat(), + "status": "running", + "result": None + } + + try: + # 导入日线数据采集器 + sys.path.append(self.scripts_dir) + + try: + from data_acquisition.a_stock_daily_data import AStockDailyDataCollector + + # 创建采集器 + collector = AStockDailyDataCollector() + + # 获取股票列表 + stock_list = collector.load_stock_list() + + if not stock_list: + task_result["status"] = "failed" + task_result["error"] = "未获取到股票列表" + logger.error("未获取到股票列表") + else: + # 测试模式下只处理少量股票 + if test_mode: + test_stocks = stock_list[:20] + logger.info(f"测试模式: 只处理 {len(test_stocks)} 只股票") + else: + test_stocks = stock_list + + # 运行采集 + result = collector.batch_collect_daily_data( + stock_list=test_stocks, + batch_size=10, + max_workers=5 + ) + + task_result["result"] = result + task_result["end_time"] = datetime.now().isoformat() + + if result.get("success", False): + task_result["status"] = "success" + logger.info("日线数据采集任务成功") + else: + task_result["status"] = "partial" + logger.warning("日线数据采集任务部分完成") + + except ImportError as e: + task_result["status"] = "failed" + task_result["error"] = f"导入采集器失败: {e}" + logger.error(f"导入采集器失败: {e}") + + except Exception as e: + task_result["status"] = "error" + task_result["error"] = str(e) + logger.error(f"日线数据采集任务异常: {e}") + + self.task_status["tasks"]["daily_data"] = task_result + + return task_result + + def run_financial_data_task(self, test_mode: bool = True) -> Dict: + """运行财务数据采集任务 + + Args: + test_mode: 测试模式(少量数据) + + Returns: + Dict: 任务执行结果 + """ + logger.info("开始财务数据采集任务") + + task_result = { + "task_name": "financial_data", + "start_time": datetime.now().isoformat(), + "status": "running", + "result": None + } + + try: + # 导入财务数据采集器 + sys.path.append(self.scripts_dir) + + try: + from data_acquisition.a_stock_financial_data import AStockFinancialDataCollector + + # 创建采集器 + collector = AStockFinancialDataCollector() + + # 获取股票列表 + stock_list = collector.load_stock_list() + + if not stock_list: + task_result["status"] = "failed" + task_result["error"] = "未获取到股票列表" + logger.error("未获取到股票列表") + else: + # 测试模式下只处理少量股票 + if test_mode: + test_stocks = stock_list[:10] + logger.info(f"测试模式: 只处理 {len(test_stocks)} 只股票") + else: + test_stocks = stock_list + + # 运行采集 + result = collector.batch_collect_financial_data( + stock_list=test_stocks, + max_workers=3 + ) + + task_result["result"] = result + task_result["end_time"] = datetime.now().isoformat() + + if result.get("success", False): + task_result["status"] = "success" + logger.info("财务数据采集任务成功") + else: + task_result["status"] = "partial" + logger.warning("财务数据采集任务部分完成") + + except ImportError as e: + task_result["status"] = "failed" + task_result["error"] = f"导入采集器失败: {e}" + logger.error(f"导入采集器失败: {e}") + + except Exception as e: + task_result["status"] = "error" + task_result["error"] = str(e) + logger.error(f"财务数据采集任务异常: {e}") + + self.task_status["tasks"]["financial_data"] = task_result + + return task_result + + def run_quality_check_task(self) -> Dict: + """运行质量检查任务 + + Returns: + Dict: 任务执行结果 + """ + logger.info("开始数据质量检查任务") + + task_result = { + "task_name": "quality_check", + "start_time": datetime.now().isoformat(), + "status": "running", + "result": None + } + + try: + # 导入质量检查器 + sys.path.append(self.scripts_dir) + + try: + from data_quality.data_quality_manager import AStockDataQualityManager + + # 创建质量管理器 + quality_manager = AStockDataQualityManager() + + # 运行质量检查 + quality_results = quality_manager.run_quality_checks() + + # 运行更新检查 + update_results = quality_manager.run_update_check() + + combined_result = { + "quality_check": quality_results, + "update_check": update_results + } + + task_result["result"] = combined_result + task_result["end_time"] = datetime.now().isoformat() + + if quality_results["overall_status"] not in ["error", "critical"]: + task_result["status"] = "success" + logger.info("数据质量检查任务成功") + else: + task_result["status"] = "warning" + logger.warning("数据质量检查发现问题") + + except ImportError as e: + task_result["status"] = "failed" + task_result["error"] = f"导入质量管理器失败: {e}" + logger.error(f"导入质量管理器失败: {e}") + + except Exception as e: + task_result["status"] = "error" + task_result["error"] = str(e) + logger.error(f"数据质量检查任务异常: {e}") + + self.task_status["tasks"]["quality_check"] = task_result + + return task_result + + def run_full_pipeline(self, test_mode: bool = True) -> Dict: + """运行完整的数据采集流水线 + + Args: + test_mode: 测试模式 + + Returns: + Dict: 完整执行结果 + """ + logger.info("开始运行完整A股数据采集流水线") + + pipeline_start_time = datetime.now() + + # 1. 基础信息采集 + basic_info_result = self.run_basic_info_task(test_mode) + + # 检查基础信息是否成功,如果失败则停止 + if basic_info_result["status"] in ["failed", "error"]: + logger.error("基础信息采集失败,停止流水线") + self._finalize_task_status() + return self.task_status + + # 等待一下,避免请求过快 + time.sleep(5) + + # 2. 日线数据采集 + daily_data_result = self.run_daily_data_task(test_mode) + + # 等待一下 + time.sleep(5) + + # 3. 财务数据采集 + financial_data_result = self.run_financial_data_task(test_mode) + + # 4. 质量检查 + quality_check_result = self.run_quality_check_task() + + # 计算总体执行时间 + pipeline_end_time = datetime.now() + execution_time = (pipeline_end_time - pipeline_start_time).total_seconds() + + # 生成汇总报告 + self._generate_summary_report(execution_time) + + # 最终化任务状态 + self._finalize_task_status() + + logger.info(f"完整数据采集流水线完成,总耗时: {execution_time:.2f}秒") + + return self.task_status + + def _generate_summary_report(self, execution_time: float): + """生成汇总报告 + + Args: + execution_time: 执行时间(秒) + """ + try: + # 统计任务结果 + total_tasks = len(self.task_status["tasks"]) + success_tasks = sum(1 for task in self.task_status["tasks"].values() if task["status"] == "success") + failed_tasks = sum(1 for task in self.task_status["tasks"].values() if task["status"] in ["failed", "error"]) + partial_tasks = sum(1 for task in self.task_status["tasks"].values() if task["status"] == "partial") + warning_tasks = sum(1 for task in self.task_status["tasks"].values() if task["status"] == "warning") + + # 收集关键指标 + data_summary = { + "basic_info_records": 0, + "daily_data_records": 0, + "financial_data_records": 0 + } + + # 从各任务结果中提取数据 + for task_name, task_result in self.task_status["tasks"].items(): + result = task_result.get("result", {}) + + if task_name == "basic_info" and result: + data_summary["basic_info_records"] = result.get("records_collected", 0) + elif task_name == "daily_data" and result: + data_summary["daily_data_records"] = result.get("total_records", 0) + elif task_name == "financial_data" and result: + data_summary["financial_data_records"] = result.get("indicators_records", 0) + + # 创建汇总报告 + summary = { + "task_id": self.task_id, + "execution_time_seconds": execution_time, + "task_summary": { + "total_tasks": total_tasks, + "success_tasks": success_tasks, + "partial_tasks": partial_tasks, + "warning_tasks": warning_tasks, + "failed_tasks": failed_tasks, + "success_rate": f"{success_tasks / total_tasks * 100:.1f}%" if total_tasks > 0 else "0%" + }, + "data_summary": data_summary, + "quality_status": "unknown", + "recommendations": [] + } + + # 质量状态 + quality_task = self.task_status["tasks"].get("quality_check") + if quality_task and quality_task.get("result"): + quality_result = quality_task["result"].get("quality_check", {}) + summary["quality_status"] = quality_result.get("overall_status", "unknown") + + # 生成建议 + if failed_tasks > 0: + summary["recommendations"].append("重新运行失败的任务") + + if partial_tasks > 0: + summary["recommendations"].append("检查部分完成的任务,进行补充采集") + + if warning_tasks > 0: + summary["recommendations"].append("检查警告信息,优化数据质量") + + if data_summary["basic_info_records"] == 0: + summary["recommendations"].append("基础信息数据采集可能存在问题") + + summary["recommendations"].append("检查数据目录结构,确保数据存储正确") + summary["recommendations"].append("建立定期更新机制,保持数据新鲜度") + + self.task_status["summary"] = summary + + # 保存汇总报告 + report_file = os.path.join(self.reports_dir, f"full_pipeline_summary_{self.task_id}.json") + with open(report_file, 'w', encoding='utf-8') as f: + json.dump(summary, f, ensure_ascii=False, indent=2) + + logger.info(f"汇总报告已保存: {report_file}") + + except Exception as e: + logger.error(f"生成汇总报告失败: {e}") + + def _finalize_task_status(self): + """最终化任务状态""" + self.task_status["end_time"] = datetime.now().isoformat() + + # 确定总体状态 + task_statuses = [task["status"] for task in self.task_status["tasks"].values()] + + if "error" in task_statuses or len([s for s in task_statuses if s in ["failed", "error"]]) > 1: + self.task_status["overall_status"] = "failed" + elif "failed" in task_statuses: + self.task_status["overall_status"] = "partial" + elif "warning" in task_statuses: + self.task_status["overall_status"] = "warning" + elif all(status == "success" for status in task_statuses): + self.task_status["overall_status"] = "success" + else: + self.task_status["overall_status"] = "partial" + + # 保存任务状态 + status_file = os.path.join(self.reports_dir, f"task_status_{self.task_id}.json") + with open(status_file, 'w', encoding='utf-8') as f: + json.dump(self.task_status, f, ensure_ascii=False, indent=2) + + logger.info(f"任务状态已保存: {status_file}") + + +def main(): + """主函数""" + print("=" * 80) + print("📊 A股数据准备 - 综合主控程序") + print("=" * 80) + + print("版本: 1.0.0") + print("功能:") + print(" 1. A股基础信息采集") + print(" 2. A股日线行情数据采集") + print(" 3. A股财务数据采集") + print(" 4. 数据质量检查") + print(" 5. 完整数据流水线执行") + print("=" * 80) + + # 创建主控制器 + controller = AStockDataMainController() + + # 用户选择执行模式 + print("\n请选择执行模式:") + print(" 1. 测试模式(少量数据,快速验证)") + print(" 2. 完整模式(全量数据,需要较长时间)") + print(" 3. 仅运行质量检查") + print(" 4. 仅运行基础信息采集") + print(" 5. 退出") + + try: + choice = input("\n请输入选择 (1-5): ").strip() + + if choice == "1": + print("\n🔧 执行测试模式...") + test_mode = True + result = controller.run_full_pipeline(test_mode) + + elif choice == "2": + print("\n🚀 执行完整模式(警告:需要较长时间)...") + confirm = input("确定执行完整模式吗?(y/N): ").strip().lower() + + if confirm == 'y': + test_mode = False + result = controller.run_full_pipeline(test_mode) + else: + print("操作已取消") + return + + elif choice == "3": + print("\n🔍 执行数据质量检查...") + result = controller.run_quality_check_task() + + elif choice == "4": + print("\n📋 执行基础信息采集...") + result = controller.run_basic_info_task(test_mode=True) + + elif choice == "5": + print("\n退出程序") + return + + else: + print(f"\n❌ 无效选择: {choice}") + return + + # 输出执行结果 + print("\n" + "=" * 80) + print("📋 执行结果") + print("=" * 80) + + overall_status = controller.task_status["overall_status"] + + if overall_status == "success": + print("✅ 任务执行成功!") + elif overall_status == "partial": + print("⚠️ 任务部分完成") + elif overall_status == "warning": + print("⚠️ 任务完成但有警告") + elif overall_status == "failed": + print("❌ 任务执行失败") + else: + print(f"❓ 任务状态: {overall_status}") + + # 输出任务详情 + print(f"\n📊 任务详情:") + for task_name, task_result in controller.task_status["tasks"].items(): + status = task_result["status"] + start_time = datetime.fromisoformat(task_result["start_time"]) + end_time = datetime.fromisoformat(task_result["end_time"]) if task_result.get("end_time") else None + + time_taken = "N/A" + if end_time: + time_taken = f"{(end_time - start_time).total_seconds():.1f}秒" + + status_symbol = { + "success": "✅", + "partial": "⚠️", + "warning": "⚠️", + "failed": "❌", + "error": "❌", + "running": "🔄" + }.get(status, "❓") + + print(f" {status_symbol} {task_name}: {status} ({time_taken})") + + # 输出数据摘要 + summary = controller.task_status.get("summary", {}) + if summary: + print(f"\n📈 数据摘要:") + data_summary = summary.get("data_summary", {}) + print(f" 基础信息记录: {data_summary.get('basic_info_records', 0)}") + print(f" 日线数据记录: {data_summary.get('daily_data_records', 0)}") + print(f" 财务数据记录: {data_summary.get('financial_data_records', 0)}") + + print(f"\n⏱️ 执行统计:") + exec_summary = summary.get("task_summary", {}) + print(f" 总任务数: {exec_summary.get('total_tasks', 0)}") + print(f" 成功任务: {exec_summary.get('success_tasks', 0)}") + print(f" 成功率: {exec_summary.get('success_rate', '0%')}") + print(f" 总耗时: {summary.get('execution_time_seconds', 0):.1f}秒") + + # 输出质量状态 + if summary.get("quality_status"): + print(f"\n🔍 质量状态: {summary['quality_status'].upper()}") + + # 输出建议 + recommendations = summary.get("recommendations", []) + if recommendations: + print(f"\n💡 建议:") + for rec in recommendations[:5]: # 只显示前5条 + print(f" • {rec}") + + # 输出报告文件位置 + print(f"\n📄 报告文件位置:") + print(f" 任务目录: {controller.base_dir}") + print(f" 数据目录: {controller.data_dir}") + print(f" 报告目录: {controller.reports_dir}") + + print("\n" + "=" * 80) + print("🎯 赵云数据工程将军 - 任务完成") + print("=" * 80) + + except KeyboardInterrupt: + print("\n\n⚠️ 用户中断程序") + except Exception as e: + print(f"\n❌ 程序执行异常: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + main() \ No newline at end of file