auto-sync: 2026-04-17 20:23:13

This commit is contained in:
cfdaily
2026-04-17 20:23:14 +08:00
parent 6f03816614
commit e74b4f41e8
@@ -0,0 +1,248 @@
#!/usr/bin/env python3
# 获取A股股票列表 - 循环回流测试任务
# 包含:股票代码、股票名称、当前价格
# 任务ID: circulation-test-002
# 执行人: 赵云(数据护军)
import sys
import os
import pandas as pd
import numpy as np
from datetime import datetime
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 添加上级目录到路径,以便导入common_tools
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from common_tools.akshare_vnpy_adapter import AKShareDataAdapter
class AStockListFetcher:
"""A股股票列表获取器"""
def __init__(self):
"""初始化"""
self.adapter = AKShareDataAdapter()
self.ak = self.adapter.ak
self.akshare_available = self.adapter.akshare_available
def get_all_a_stocks(self) -> pd.DataFrame:
"""获取所有A股股票列表
Returns:
pd.DataFrame: 包含代码、名称、当前价格的DataFrame
"""
logger.info("开始获取A股股票列表...")
try:
if self.akshare_available:
# 使用akshare获取A股股票列表
stocks_df = self.ak.stock_zh_a_spot()
logger.info(f"成功获取A股股票列表,共 {len(stocks_df)} 只股票")
# 整理列名,选择需要的字段
# akshare返回的列名:代码,名称,最新价,涨跌幅,涨跌额,买入,卖出,成交量,成交额,开盘,最高,最低,昨收
result_df = pd.DataFrame()
result_df['code'] = stocks_df['代码']
result_df['name'] = stocks_df['名称']
result_df['current_price'] = stocks_df['最新价']
result_df['change_percent'] = stocks_df['涨跌幅']
result_df['change_amount'] = stocks_df['涨跌额']
result_df['volume'] = stocks_df['成交量']
result_df['amount'] = stocks_df['成交额']
result_df['open'] = stocks_df['开盘']
result_df['high'] = stocks_df['最高']
result_df['low'] = stocks_df['最低']
result_df['pre_close'] = stocks_df['昨收']
else:
# AKShare不可用,生成模拟数据
logger.warning("AKShare不可用,生成模拟测试数据")
result_df = self._generate_mock_data()
# 去重处理(防止重复)
result_df = result_df.drop_duplicates(subset=['code'], keep='first')
# 按代码排序
result_df = result_df.sort_values('code').reset_index(drop=True)
logger.info(f"A股股票列表处理完成,最终 {len(result_df)} 只股票")
return result_df
except Exception as e:
logger.error(f"获取A股股票列表失败: {e}")
# 返回空DataFrame
return pd.DataFrame()
def _generate_mock_data(self) -> pd.DataFrame:
"""生成模拟数据用于测试
Returns:
pd.DataFrame: 模拟股票数据
"""
# 一些代表性的股票作为模拟数据
mock_stocks = [
{'code': '000001', 'name': '平安银行', 'current_price': 11.25},
{'code': '000002', 'name': '万科A', 'current_price': 12.38},
{'code': '002594', 'name': '比亚迪', 'current_price': 235.60},
{'code': '600000', 'name': '浦发银行', 'current_price': 7.89},
{'code': '600519', 'name': '贵州茅台', 'current_price': 1688.00},
{'code': '601318', 'name': '中国平安', 'current_price': 42.35},
{'code': '601899', 'name': '紫金矿业', 'current_price': 10.26},
{'code': '600036', 'name': '招商银行', 'current_price': 31.28},
{'code': '000858', 'name': '五粮液', 'current_price': 158.60},
{'code': '300750', 'name': '宁德时代', 'current_price': 288.50},
]
df = pd.DataFrame(mock_stocks)
# 添加其他字段
df['change_percent'] = np.random.uniform(-5.0, 5.0, len(df))
df['change_amount'] = df['current_price'] * df['change_percent'] / 100
df['volume'] = np.random.uniform(1000000, 100000000, len(df))
df['amount'] = df['volume'] * df['current_price']
df['open'] = df['current_price'] * np.random.uniform(0.98, 1.02, len(df))
df['high'] = df['open'] * np.random.uniform(1.0, 1.05, len(df))
df['low'] = df['open'] * np.random.uniform(0.95, 1.0, len(df))
df['pre_close'] = df['current_price'] - df['change_amount']
return df
def filter_by_market(self, df: pd.DataFrame, market: str) -> pd.DataFrame:
"""按市场筛选股票
Args:
df: 原始股票列表
market: 市场类型,'sh'表示沪市,'sz'表示深市,'cyb'表示创业板,'kc'表示科创板
Returns:
pd.DataFrame: 筛选后的结果
"""
if df.empty:
return df
if market == 'sh':
# 沪市:6开头
filtered = df[df['code'].str.startswith('6')]
elif market == 'sz':
# 深市主板:0开头且不是002、003
filtered = df[df['code'].str.startswith('0') & ~df['code'].str.startswith(('002', '003'))]
elif market == 'cyb':
# 创业板:3开头
filtered = df[df['code'].str.startswith('3')]
elif market == 'kc':
# 科创板:688开头
filtered = df[df['code'].str.startswith('688')]
else:
# 返回全部
filtered = df
logger.info(f"按市场 [{market}] 筛选后得到 {len(filtered)} 只股票")
return filtered
def save_to_csv(self, df: pd.DataFrame, output_path: str = None) -> str:
"""保存数据到CSV文件
Args:
df: 股票数据DataFrame
output_path: 输出文件路径,如果为None则自动生成
Returns:
str: 保存的文件路径
"""
if df.empty:
logger.warning("数据为空,跳过保存")
return ""
if output_path is None:
# 自动生成输出路径
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
'data', 'processed', 'stock_list'
)
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"a_stock_list_{timestamp}.csv")
# 保存为CSV
df.to_csv(output_path, index=False, encoding='utf-8-sig')
logger.info(f"A股股票列表已保存到: {output_path}")
return output_path
def save_to_json(self, df: pd.DataFrame, output_path: str = None) -> str:
"""保存数据到JSON文件
Args:
df: 股票数据DataFrame
output_path: 输出文件路径,如果为None则自动生成
Returns:
str: 保存的文件路径
"""
if df.empty:
logger.warning("数据为空,跳过保存")
return ""
if output_path is None:
# 自动生成输出路径
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
'data', 'processed', 'stock_list'
)
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"a_stock_list_{timestamp}.json")
# 转换为字典并保存
data = {
'fetch_time': datetime.now().isoformat(),
'total_count': len(df),
'stocks': df.to_dict(orient='records')
}
with open(output_path, 'w', encoding='utf-8') as f:
import json
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"A股股票列表已保存到: {output_path}")
return output_path
def main():
"""主函数,执行获取A股股票列表"""
fetcher = AStockListFetcher()
# 获取全部A股股票
all_stocks = fetcher.get_all_a_stocks()
if all_stocks.empty:
logger.error("获取A股股票列表失败,退出程序")
sys.exit(1)
# 打印统计信息
print(f"\n获取成功!共获取 {len(all_stocks)} 只A股股票:")
print(f"- 沪市主板: {len(all_stocks[all_stocks['code'].str.startswith('6') & ~all_stocks['code'].str.startswith('688')])}")
print(f"- 科创板: {len(all_stocks[all_stocks['code'].str.startswith('688')])}")
print(f"- 深市主板: {len(all_stocks[all_stocks['code'].str.startswith('0') & ~all_stocks['code'].str.startswith(('002', '003', '00'))])}")
print(f"- 中小板: {len(all_stocks[all_stocks['code'].str.startswith(('002', '003'))])}")
print(f"- 创业板: {len(all_stocks[all_stocks['code'].str.startswith('3')])}")
# 显示前10条数据
print("\n前10条数据示例:")
print(all_stocks.head(10).to_string(index=False))
# 保存文件
csv_path = fetcher.save_to_csv(all_stocks)
json_path = fetcher.save_to_json(all_stocks)
print(f"\n文件已保存:")
print(f"- CSV: {csv_path}")
print(f"- JSON: {json_path}")
return 0
if __name__ == "__main__":
sys.exit(main())