auto-sync: 2026-03-26 11:46:42

This commit is contained in:
cfdaily
2026-03-26 11:46:42 +08:00
parent 9b36ca3003
commit d9465d53a0
@@ -0,0 +1,627 @@
#!/usr/bin/env python3
"""
A股数据准备综合主控脚本
统一管理基础信息、日线数据、财务数据采集
"""
import sys
import os
import time
import json
import pandas as pd
from datetime import datetime
from typing import List, Dict, Optional
import logging
import warnings
warnings.filterwarnings('ignore')
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class AStockDataMainController:
"""A股数据主控制器"""
def __init__(self):
"""初始化控制器"""
logger.info("A股数据主控制器初始化")
# 基础路径
self.base_dir = "/Users/chufeng/.openclaw/sanguo_projects/sanguo_quant_live/zhaoyun-data"
self.data_dir = os.path.join(self.base_dir, "data")
self.scripts_dir = os.path.join(self.base_dir, "scripts")
self.reports_dir = os.path.join(self.base_dir, "reports")
# 确保目录存在
os.makedirs(self.reports_dir, exist_ok=True)
# 当前时间
self.start_time = datetime.now()
self.task_id = self.start_time.strftime('%Y%m%d_%H%M%S')
# 任务执行状态
self.task_status = {
"task_id": self.task_id,
"start_time": self.start_time.isoformat(),
"end_time": None,
"overall_status": "pending",
"tasks": {},
"summary": {}
}
logger.info(f"任务ID: {self.task_id}")
def run_basic_info_task(self, test_mode: bool = True) -> Dict:
"""运行基础信息采集任务
Args:
test_mode: 测试模式(少量数据)
Returns:
Dict: 任务执行结果
"""
logger.info("开始基础信息采集任务")
task_result = {
"task_name": "basic_info",
"start_time": datetime.now().isoformat(),
"status": "running",
"result": None
}
try:
# 导入基础信息采集器
sys.path.append(self.scripts_dir)
try:
from data_acquisition.a_stock_basic_info import AStockBasicInfoCollector
# 创建采集器
collector = AStockBasicInfoCollector()
# 运行采集
result = collector.run()
task_result["result"] = result
task_result["end_time"] = datetime.now().isoformat()
if result.get("success", False):
task_result["status"] = "success"
logger.info("基础信息采集任务成功")
else:
task_result["status"] = "failed"
logger.warning("基础信息采集任务失败")
except ImportError as e:
task_result["status"] = "failed"
task_result["error"] = f"导入采集器失败: {e}"
logger.error(f"导入采集器失败: {e}")
except Exception as e:
task_result["status"] = "error"
task_result["error"] = str(e)
logger.error(f"基础信息采集任务异常: {e}")
self.task_status["tasks"]["basic_info"] = task_result
return task_result
def run_daily_data_task(self, test_mode: bool = True) -> Dict:
"""运行日线数据采集任务
Args:
test_mode: 测试模式(少量数据)
Returns:
Dict: 任务执行结果
"""
logger.info("开始日线数据采集任务")
task_result = {
"task_name": "daily_data",
"start_time": datetime.now().isoformat(),
"status": "running",
"result": None
}
try:
# 导入日线数据采集器
sys.path.append(self.scripts_dir)
try:
from data_acquisition.a_stock_daily_data import AStockDailyDataCollector
# 创建采集器
collector = AStockDailyDataCollector()
# 获取股票列表
stock_list = collector.load_stock_list()
if not stock_list:
task_result["status"] = "failed"
task_result["error"] = "未获取到股票列表"
logger.error("未获取到股票列表")
else:
# 测试模式下只处理少量股票
if test_mode:
test_stocks = stock_list[:20]
logger.info(f"测试模式: 只处理 {len(test_stocks)} 只股票")
else:
test_stocks = stock_list
# 运行采集
result = collector.batch_collect_daily_data(
stock_list=test_stocks,
batch_size=10,
max_workers=5
)
task_result["result"] = result
task_result["end_time"] = datetime.now().isoformat()
if result.get("success", False):
task_result["status"] = "success"
logger.info("日线数据采集任务成功")
else:
task_result["status"] = "partial"
logger.warning("日线数据采集任务部分完成")
except ImportError as e:
task_result["status"] = "failed"
task_result["error"] = f"导入采集器失败: {e}"
logger.error(f"导入采集器失败: {e}")
except Exception as e:
task_result["status"] = "error"
task_result["error"] = str(e)
logger.error(f"日线数据采集任务异常: {e}")
self.task_status["tasks"]["daily_data"] = task_result
return task_result
def run_financial_data_task(self, test_mode: bool = True) -> Dict:
"""运行财务数据采集任务
Args:
test_mode: 测试模式(少量数据)
Returns:
Dict: 任务执行结果
"""
logger.info("开始财务数据采集任务")
task_result = {
"task_name": "financial_data",
"start_time": datetime.now().isoformat(),
"status": "running",
"result": None
}
try:
# 导入财务数据采集器
sys.path.append(self.scripts_dir)
try:
from data_acquisition.a_stock_financial_data import AStockFinancialDataCollector
# 创建采集器
collector = AStockFinancialDataCollector()
# 获取股票列表
stock_list = collector.load_stock_list()
if not stock_list:
task_result["status"] = "failed"
task_result["error"] = "未获取到股票列表"
logger.error("未获取到股票列表")
else:
# 测试模式下只处理少量股票
if test_mode:
test_stocks = stock_list[:10]
logger.info(f"测试模式: 只处理 {len(test_stocks)} 只股票")
else:
test_stocks = stock_list
# 运行采集
result = collector.batch_collect_financial_data(
stock_list=test_stocks,
max_workers=3
)
task_result["result"] = result
task_result["end_time"] = datetime.now().isoformat()
if result.get("success", False):
task_result["status"] = "success"
logger.info("财务数据采集任务成功")
else:
task_result["status"] = "partial"
logger.warning("财务数据采集任务部分完成")
except ImportError as e:
task_result["status"] = "failed"
task_result["error"] = f"导入采集器失败: {e}"
logger.error(f"导入采集器失败: {e}")
except Exception as e:
task_result["status"] = "error"
task_result["error"] = str(e)
logger.error(f"财务数据采集任务异常: {e}")
self.task_status["tasks"]["financial_data"] = task_result
return task_result
def run_quality_check_task(self) -> Dict:
"""运行质量检查任务
Returns:
Dict: 任务执行结果
"""
logger.info("开始数据质量检查任务")
task_result = {
"task_name": "quality_check",
"start_time": datetime.now().isoformat(),
"status": "running",
"result": None
}
try:
# 导入质量检查器
sys.path.append(self.scripts_dir)
try:
from data_quality.data_quality_manager import AStockDataQualityManager
# 创建质量管理器
quality_manager = AStockDataQualityManager()
# 运行质量检查
quality_results = quality_manager.run_quality_checks()
# 运行更新检查
update_results = quality_manager.run_update_check()
combined_result = {
"quality_check": quality_results,
"update_check": update_results
}
task_result["result"] = combined_result
task_result["end_time"] = datetime.now().isoformat()
if quality_results["overall_status"] not in ["error", "critical"]:
task_result["status"] = "success"
logger.info("数据质量检查任务成功")
else:
task_result["status"] = "warning"
logger.warning("数据质量检查发现问题")
except ImportError as e:
task_result["status"] = "failed"
task_result["error"] = f"导入质量管理器失败: {e}"
logger.error(f"导入质量管理器失败: {e}")
except Exception as e:
task_result["status"] = "error"
task_result["error"] = str(e)
logger.error(f"数据质量检查任务异常: {e}")
self.task_status["tasks"]["quality_check"] = task_result
return task_result
def run_full_pipeline(self, test_mode: bool = True) -> Dict:
"""运行完整的数据采集流水线
Args:
test_mode: 测试模式
Returns:
Dict: 完整执行结果
"""
logger.info("开始运行完整A股数据采集流水线")
pipeline_start_time = datetime.now()
# 1. 基础信息采集
basic_info_result = self.run_basic_info_task(test_mode)
# 检查基础信息是否成功,如果失败则停止
if basic_info_result["status"] in ["failed", "error"]:
logger.error("基础信息采集失败,停止流水线")
self._finalize_task_status()
return self.task_status
# 等待一下,避免请求过快
time.sleep(5)
# 2. 日线数据采集
daily_data_result = self.run_daily_data_task(test_mode)
# 等待一下
time.sleep(5)
# 3. 财务数据采集
financial_data_result = self.run_financial_data_task(test_mode)
# 4. 质量检查
quality_check_result = self.run_quality_check_task()
# 计算总体执行时间
pipeline_end_time = datetime.now()
execution_time = (pipeline_end_time - pipeline_start_time).total_seconds()
# 生成汇总报告
self._generate_summary_report(execution_time)
# 最终化任务状态
self._finalize_task_status()
logger.info(f"完整数据采集流水线完成,总耗时: {execution_time:.2f}")
return self.task_status
def _generate_summary_report(self, execution_time: float):
"""生成汇总报告
Args:
execution_time: 执行时间(秒)
"""
try:
# 统计任务结果
total_tasks = len(self.task_status["tasks"])
success_tasks = sum(1 for task in self.task_status["tasks"].values() if task["status"] == "success")
failed_tasks = sum(1 for task in self.task_status["tasks"].values() if task["status"] in ["failed", "error"])
partial_tasks = sum(1 for task in self.task_status["tasks"].values() if task["status"] == "partial")
warning_tasks = sum(1 for task in self.task_status["tasks"].values() if task["status"] == "warning")
# 收集关键指标
data_summary = {
"basic_info_records": 0,
"daily_data_records": 0,
"financial_data_records": 0
}
# 从各任务结果中提取数据
for task_name, task_result in self.task_status["tasks"].items():
result = task_result.get("result", {})
if task_name == "basic_info" and result:
data_summary["basic_info_records"] = result.get("records_collected", 0)
elif task_name == "daily_data" and result:
data_summary["daily_data_records"] = result.get("total_records", 0)
elif task_name == "financial_data" and result:
data_summary["financial_data_records"] = result.get("indicators_records", 0)
# 创建汇总报告
summary = {
"task_id": self.task_id,
"execution_time_seconds": execution_time,
"task_summary": {
"total_tasks": total_tasks,
"success_tasks": success_tasks,
"partial_tasks": partial_tasks,
"warning_tasks": warning_tasks,
"failed_tasks": failed_tasks,
"success_rate": f"{success_tasks / total_tasks * 100:.1f}%" if total_tasks > 0 else "0%"
},
"data_summary": data_summary,
"quality_status": "unknown",
"recommendations": []
}
# 质量状态
quality_task = self.task_status["tasks"].get("quality_check")
if quality_task and quality_task.get("result"):
quality_result = quality_task["result"].get("quality_check", {})
summary["quality_status"] = quality_result.get("overall_status", "unknown")
# 生成建议
if failed_tasks > 0:
summary["recommendations"].append("重新运行失败的任务")
if partial_tasks > 0:
summary["recommendations"].append("检查部分完成的任务,进行补充采集")
if warning_tasks > 0:
summary["recommendations"].append("检查警告信息,优化数据质量")
if data_summary["basic_info_records"] == 0:
summary["recommendations"].append("基础信息数据采集可能存在问题")
summary["recommendations"].append("检查数据目录结构,确保数据存储正确")
summary["recommendations"].append("建立定期更新机制,保持数据新鲜度")
self.task_status["summary"] = summary
# 保存汇总报告
report_file = os.path.join(self.reports_dir, f"full_pipeline_summary_{self.task_id}.json")
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
logger.info(f"汇总报告已保存: {report_file}")
except Exception as e:
logger.error(f"生成汇总报告失败: {e}")
def _finalize_task_status(self):
"""最终化任务状态"""
self.task_status["end_time"] = datetime.now().isoformat()
# 确定总体状态
task_statuses = [task["status"] for task in self.task_status["tasks"].values()]
if "error" in task_statuses or len([s for s in task_statuses if s in ["failed", "error"]]) > 1:
self.task_status["overall_status"] = "failed"
elif "failed" in task_statuses:
self.task_status["overall_status"] = "partial"
elif "warning" in task_statuses:
self.task_status["overall_status"] = "warning"
elif all(status == "success" for status in task_statuses):
self.task_status["overall_status"] = "success"
else:
self.task_status["overall_status"] = "partial"
# 保存任务状态
status_file = os.path.join(self.reports_dir, f"task_status_{self.task_id}.json")
with open(status_file, 'w', encoding='utf-8') as f:
json.dump(self.task_status, f, ensure_ascii=False, indent=2)
logger.info(f"任务状态已保存: {status_file}")
def main():
"""主函数"""
print("=" * 80)
print("📊 A股数据准备 - 综合主控程序")
print("=" * 80)
print("版本: 1.0.0")
print("功能:")
print(" 1. A股基础信息采集")
print(" 2. A股日线行情数据采集")
print(" 3. A股财务数据采集")
print(" 4. 数据质量检查")
print(" 5. 完整数据流水线执行")
print("=" * 80)
# 创建主控制器
controller = AStockDataMainController()
# 用户选择执行模式
print("\n请选择执行模式:")
print(" 1. 测试模式(少量数据,快速验证)")
print(" 2. 完整模式(全量数据,需要较长时间)")
print(" 3. 仅运行质量检查")
print(" 4. 仅运行基础信息采集")
print(" 5. 退出")
try:
choice = input("\n请输入选择 (1-5): ").strip()
if choice == "1":
print("\n🔧 执行测试模式...")
test_mode = True
result = controller.run_full_pipeline(test_mode)
elif choice == "2":
print("\n🚀 执行完整模式(警告:需要较长时间)...")
confirm = input("确定执行完整模式吗?(y/N): ").strip().lower()
if confirm == 'y':
test_mode = False
result = controller.run_full_pipeline(test_mode)
else:
print("操作已取消")
return
elif choice == "3":
print("\n🔍 执行数据质量检查...")
result = controller.run_quality_check_task()
elif choice == "4":
print("\n📋 执行基础信息采集...")
result = controller.run_basic_info_task(test_mode=True)
elif choice == "5":
print("\n退出程序")
return
else:
print(f"\n❌ 无效选择: {choice}")
return
# 输出执行结果
print("\n" + "=" * 80)
print("📋 执行结果")
print("=" * 80)
overall_status = controller.task_status["overall_status"]
if overall_status == "success":
print("✅ 任务执行成功!")
elif overall_status == "partial":
print("⚠️ 任务部分完成")
elif overall_status == "warning":
print("⚠️ 任务完成但有警告")
elif overall_status == "failed":
print("❌ 任务执行失败")
else:
print(f"❓ 任务状态: {overall_status}")
# 输出任务详情
print(f"\n📊 任务详情:")
for task_name, task_result in controller.task_status["tasks"].items():
status = task_result["status"]
start_time = datetime.fromisoformat(task_result["start_time"])
end_time = datetime.fromisoformat(task_result["end_time"]) if task_result.get("end_time") else None
time_taken = "N/A"
if end_time:
time_taken = f"{(end_time - start_time).total_seconds():.1f}"
status_symbol = {
"success": "",
"partial": "⚠️",
"warning": "⚠️",
"failed": "",
"error": "",
"running": "🔄"
}.get(status, "")
print(f" {status_symbol} {task_name}: {status} ({time_taken})")
# 输出数据摘要
summary = controller.task_status.get("summary", {})
if summary:
print(f"\n📈 数据摘要:")
data_summary = summary.get("data_summary", {})
print(f" 基础信息记录: {data_summary.get('basic_info_records', 0)}")
print(f" 日线数据记录: {data_summary.get('daily_data_records', 0)}")
print(f" 财务数据记录: {data_summary.get('financial_data_records', 0)}")
print(f"\n⏱️ 执行统计:")
exec_summary = summary.get("task_summary", {})
print(f" 总任务数: {exec_summary.get('total_tasks', 0)}")
print(f" 成功任务: {exec_summary.get('success_tasks', 0)}")
print(f" 成功率: {exec_summary.get('success_rate', '0%')}")
print(f" 总耗时: {summary.get('execution_time_seconds', 0):.1f}")
# 输出质量状态
if summary.get("quality_status"):
print(f"\n🔍 质量状态: {summary['quality_status'].upper()}")
# 输出建议
recommendations = summary.get("recommendations", [])
if recommendations:
print(f"\n💡 建议:")
for rec in recommendations[:5]: # 只显示前5条
print(f"{rec}")
# 输出报告文件位置
print(f"\n📄 报告文件位置:")
print(f" 任务目录: {controller.base_dir}")
print(f" 数据目录: {controller.data_dir}")
print(f" 报告目录: {controller.reports_dir}")
print("\n" + "=" * 80)
print("🎯 赵云数据工程将军 - 任务完成")
print("=" * 80)
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断程序")
except Exception as e:
print(f"\n❌ 程序执行异常: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()