Files
sanguo_quant_live/zhaoyun-data/scripts/data_acquisition/batch_downloader.py
T

219 lines
7.2 KiB
Python

#!/usr/bin/env python3
# 批量数据下载器 - 赵云数据工程工具
# 用于批量下载聚宽文章、金融数据等
import requests
import time
import json
import os
from typing import List, Dict, Optional
import logging
from datetime import datetime
class BatchDownloader:
"""批量数据下载器"""
def __init__(self, output_dir: str = "./data/raw"):
self.output_dir = output_dir
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def download_jq_articles(self, article_links: List[str], delay: float = 1.0) -> Dict:
"""下载聚宽文章
Args:
article_links: 文章链接列表
delay: 请求延迟(秒)
Returns:
Dict: 下载结果统计
"""
results = {
'total': len(article_links),
'success': 0,
'failed': 0,
'articles': []
}
for i, link in enumerate(article_links, 1):
try:
self.logger.info(f"下载文章 {i}/{len(article_links)}: {link}")
# 模拟请求
response = self.session.get(link, timeout=10)
response.raise_for_status()
# 解析文章内容
article_data = self._parse_jq_article(response.text)
# 保存文章
article_id = f"article_{i:03d}"
save_path = os.path.join(self.output_dir, f"{article_id}.json")
with open(save_path, 'w', encoding='utf-8') as f:
json.dump(article_data, f, ensure_ascii=False, indent=2)
results['success'] += 1
results['articles'].append({
'id': article_id,
'url': link,
'save_path': save_path,
'timestamp': datetime.now().isoformat()
})
self.logger.info(f"文章 {article_id} 下载成功")
except Exception as e:
self.logger.error(f"下载失败 {link}: {e}")
results['failed'] += 1
# 请求延迟
if i < len(article_links):
time.sleep(delay)
return results
def _parse_jq_article(self, html_content: str) -> Dict:
"""解析聚宽文章内容
Args:
html_content: HTML内容
Returns:
Dict: 解析后的文章数据
"""
# 这里简化处理,实际需要HTML解析
return {
'title': f"聚宽文章 - {datetime.now().strftime('%Y%m%d_%H%M%S')}",
'content': "文章内容解析逻辑待实现",
'metadata': {
'source': 'joinquant',
'crawl_time': datetime.now().isoformat(),
'status': 'raw'
}
}
def download_financial_data(self, symbols: List[str], start_date: str, end_date: str) -> Dict:
"""下载金融数据
Args:
symbols: 股票代码列表
start_date: 开始日期
end_date: 结束日期
Returns:
Dict: 下载结果
"""
results = {}
for symbol in symbols:
try:
self.logger.info(f"下载金融数据: {symbol}")
# 这里可以集成akshare、tushare等数据源
# 示例数据
data = {
'symbol': symbol,
'start_date': start_date,
'end_date': end_date,
'data': [] # 实际数据
}
# 保存数据
save_path = os.path.join(self.output_dir, f"financial_{symbol}_{start_date}_{end_date}.json")
with open(save_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
results[symbol] = {
'status': 'success',
'save_path': save_path
}
except Exception as e:
self.logger.error(f"下载金融数据失败 {symbol}: {e}")
results[symbol] = {
'status': 'failed',
'error': str(e)
}
return results
def resume_download(self, log_file: str) -> Dict:
"""断点续传
Args:
log_file: 下载日志文件
Returns:
Dict: 续传结果
"""
self.logger.info(f"尝试断点续传: {log_file}")
try:
with open(log_file, 'r', encoding='utf-8') as f:
log_data = json.load(f)
# 找出失败的下载项
failed_items = [item for item in log_data.get('items', [])
if item.get('status') == 'failed']
if not failed_items:
self.logger.info("没有失败的下载项")
return {'status': 'completed', 'failed': 0}
self.logger.info(f"发现 {len(failed_items)} 个失败的下载项,尝试重新下载")
# 重新下载失败的项
success_count = 0
for item in failed_items:
try:
# 重新下载逻辑
# ...
success_count += 1
except Exception as e:
self.logger.error(f"重新下载失败: {e}")
return {
'status': f'resumed {success_count}/{len(failed_items)}',
'total_failed': len(failed_items),
'resumed': success_count,
'still_failed': len(failed_items) - success_count
}
except Exception as e:
self.logger.error(f"断点续传失败: {e}")
return {'status': 'failed', 'error': str(e)}
def main():
"""示例使用"""
downloader = BatchDownloader()
# 示例:下载聚宽文章
article_links = [
"https://www.joinquant.com/view/community/detail/12345",
"https://www.joinquant.com/view/community/detail/67890"
]
results = downloader.download_jq_articles(article_links)
print(f"下载结果: {json.dumps(results, ensure_ascii=False, indent=2)}")
# 示例:下载金融数据
stock_symbols = ['000001', '000002']
financial_results = downloader.download_financial_data(
stock_symbols, '2024-01-01', '2024-03-01'
)
print(f"金融数据下载结果: {json.dumps(financial_results, ensure_ascii=False, indent=2)}")
if __name__ == "__main__":
main()