From e74b4f41e8bfce435b4dde09bde3f44bd198972d Mon Sep 17 00:00:00 2001 From: cfdaily Date: Fri, 17 Apr 2026 20:23:14 +0800 Subject: [PATCH] auto-sync: 2026-04-17 20:23:13 --- .../data_acquisition/get_a_stock_list.py | 248 ++++++++++++++++++ 1 file changed, 248 insertions(+) create mode 100644 zhaoyun-data/scripts/data_acquisition/get_a_stock_list.py diff --git a/zhaoyun-data/scripts/data_acquisition/get_a_stock_list.py b/zhaoyun-data/scripts/data_acquisition/get_a_stock_list.py new file mode 100644 index 000000000..89eacea3e --- /dev/null +++ b/zhaoyun-data/scripts/data_acquisition/get_a_stock_list.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python3 +# 获取A股股票列表 - 循环回流测试任务 +# 包含:股票代码、股票名称、当前价格 +# 任务ID: circulation-test-002 +# 执行人: 赵云(数据护军) + +import sys +import os +import pandas as pd +import numpy as np +from datetime import datetime +import logging + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# 添加上级目录到路径,以便导入common_tools +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from common_tools.akshare_vnpy_adapter import AKShareDataAdapter + +class AStockListFetcher: + """A股股票列表获取器""" + + def __init__(self): + """初始化""" + self.adapter = AKShareDataAdapter() + self.ak = self.adapter.ak + self.akshare_available = self.adapter.akshare_available + + def get_all_a_stocks(self) -> pd.DataFrame: + """获取所有A股股票列表 + + Returns: + pd.DataFrame: 包含代码、名称、当前价格的DataFrame + """ + logger.info("开始获取A股股票列表...") + + try: + if self.akshare_available: + # 使用akshare获取A股股票列表 + stocks_df = self.ak.stock_zh_a_spot() + logger.info(f"成功获取A股股票列表,共 {len(stocks_df)} 只股票") + + # 整理列名,选择需要的字段 + # akshare返回的列名:代码,名称,最新价,涨跌幅,涨跌额,买入,卖出,成交量,成交额,开盘,最高,最低,昨收 + result_df = pd.DataFrame() + result_df['code'] = stocks_df['代码'] + result_df['name'] = stocks_df['名称'] + result_df['current_price'] = stocks_df['最新价'] + result_df['change_percent'] = stocks_df['涨跌幅'] + result_df['change_amount'] = stocks_df['涨跌额'] + result_df['volume'] = stocks_df['成交量'] + result_df['amount'] = stocks_df['成交额'] + result_df['open'] = stocks_df['开盘'] + result_df['high'] = stocks_df['最高'] + result_df['low'] = stocks_df['最低'] + result_df['pre_close'] = stocks_df['昨收'] + + else: + # AKShare不可用,生成模拟数据 + logger.warning("AKShare不可用,生成模拟测试数据") + result_df = self._generate_mock_data() + + # 去重处理(防止重复) + result_df = result_df.drop_duplicates(subset=['code'], keep='first') + # 按代码排序 + result_df = result_df.sort_values('code').reset_index(drop=True) + + logger.info(f"A股股票列表处理完成,最终 {len(result_df)} 只股票") + return result_df + + except Exception as e: + logger.error(f"获取A股股票列表失败: {e}") + # 返回空DataFrame + return pd.DataFrame() + + def _generate_mock_data(self) -> pd.DataFrame: + """生成模拟数据用于测试 + + Returns: + pd.DataFrame: 模拟股票数据 + """ + # 一些代表性的股票作为模拟数据 + mock_stocks = [ + {'code': '000001', 'name': '平安银行', 'current_price': 11.25}, + {'code': '000002', 'name': '万科A', 'current_price': 12.38}, + {'code': '002594', 'name': '比亚迪', 'current_price': 235.60}, + {'code': '600000', 'name': '浦发银行', 'current_price': 7.89}, + {'code': '600519', 'name': '贵州茅台', 'current_price': 1688.00}, + {'code': '601318', 'name': '中国平安', 'current_price': 42.35}, + {'code': '601899', 'name': '紫金矿业', 'current_price': 10.26}, + {'code': '600036', 'name': '招商银行', 'current_price': 31.28}, + {'code': '000858', 'name': '五粮液', 'current_price': 158.60}, + {'code': '300750', 'name': '宁德时代', 'current_price': 288.50}, + ] + + df = pd.DataFrame(mock_stocks) + + # 添加其他字段 + df['change_percent'] = np.random.uniform(-5.0, 5.0, len(df)) + df['change_amount'] = df['current_price'] * df['change_percent'] / 100 + df['volume'] = np.random.uniform(1000000, 100000000, len(df)) + df['amount'] = df['volume'] * df['current_price'] + df['open'] = df['current_price'] * np.random.uniform(0.98, 1.02, len(df)) + df['high'] = df['open'] * np.random.uniform(1.0, 1.05, len(df)) + df['low'] = df['open'] * np.random.uniform(0.95, 1.0, len(df)) + df['pre_close'] = df['current_price'] - df['change_amount'] + + return df + + def filter_by_market(self, df: pd.DataFrame, market: str) -> pd.DataFrame: + """按市场筛选股票 + + Args: + df: 原始股票列表 + market: 市场类型,'sh'表示沪市,'sz'表示深市,'cyb'表示创业板,'kc'表示科创板 + + Returns: + pd.DataFrame: 筛选后的结果 + """ + if df.empty: + return df + + if market == 'sh': + # 沪市:6开头 + filtered = df[df['code'].str.startswith('6')] + elif market == 'sz': + # 深市主板:0开头且不是002、003 + filtered = df[df['code'].str.startswith('0') & ~df['code'].str.startswith(('002', '003'))] + elif market == 'cyb': + # 创业板:3开头 + filtered = df[df['code'].str.startswith('3')] + elif market == 'kc': + # 科创板:688开头 + filtered = df[df['code'].str.startswith('688')] + else: + # 返回全部 + filtered = df + + logger.info(f"按市场 [{market}] 筛选后得到 {len(filtered)} 只股票") + return filtered + + def save_to_csv(self, df: pd.DataFrame, output_path: str = None) -> str: + """保存数据到CSV文件 + + Args: + df: 股票数据DataFrame + output_path: 输出文件路径,如果为None则自动生成 + + Returns: + str: 保存的文件路径 + """ + if df.empty: + logger.warning("数据为空,跳过保存") + return "" + + if output_path is None: + # 自动生成输出路径 + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + output_dir = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), + 'data', 'processed', 'stock_list' + ) + os.makedirs(output_dir, exist_ok=True) + output_path = os.path.join(output_dir, f"a_stock_list_{timestamp}.csv") + + # 保存为CSV + df.to_csv(output_path, index=False, encoding='utf-8-sig') + logger.info(f"A股股票列表已保存到: {output_path}") + return output_path + + def save_to_json(self, df: pd.DataFrame, output_path: str = None) -> str: + """保存数据到JSON文件 + + Args: + df: 股票数据DataFrame + output_path: 输出文件路径,如果为None则自动生成 + + Returns: + str: 保存的文件路径 + """ + if df.empty: + logger.warning("数据为空,跳过保存") + return "" + + if output_path is None: + # 自动生成输出路径 + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + output_dir = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), + 'data', 'processed', 'stock_list' + ) + os.makedirs(output_dir, exist_ok=True) + output_path = os.path.join(output_dir, f"a_stock_list_{timestamp}.json") + + # 转换为字典并保存 + data = { + 'fetch_time': datetime.now().isoformat(), + 'total_count': len(df), + 'stocks': df.to_dict(orient='records') + } + + with open(output_path, 'w', encoding='utf-8') as f: + import json + json.dump(data, f, ensure_ascii=False, indent=2) + + logger.info(f"A股股票列表已保存到: {output_path}") + return output_path + +def main(): + """主函数,执行获取A股股票列表""" + fetcher = AStockListFetcher() + + # 获取全部A股股票 + all_stocks = fetcher.get_all_a_stocks() + + if all_stocks.empty: + logger.error("获取A股股票列表失败,退出程序") + sys.exit(1) + + # 打印统计信息 + print(f"\n获取成功!共获取 {len(all_stocks)} 只A股股票:") + print(f"- 沪市主板: {len(all_stocks[all_stocks['code'].str.startswith('6') & ~all_stocks['code'].str.startswith('688')])}") + print(f"- 科创板: {len(all_stocks[all_stocks['code'].str.startswith('688')])}") + print(f"- 深市主板: {len(all_stocks[all_stocks['code'].str.startswith('0') & ~all_stocks['code'].str.startswith(('002', '003', '00'))])}") + print(f"- 中小板: {len(all_stocks[all_stocks['code'].str.startswith(('002', '003'))])}") + print(f"- 创业板: {len(all_stocks[all_stocks['code'].str.startswith('3')])}") + + # 显示前10条数据 + print("\n前10条数据示例:") + print(all_stocks.head(10).to_string(index=False)) + + # 保存文件 + csv_path = fetcher.save_to_csv(all_stocks) + json_path = fetcher.save_to_json(all_stocks) + + print(f"\n文件已保存:") + print(f"- CSV: {csv_path}") + print(f"- JSON: {json_path}") + + return 0 + +if __name__ == "__main__": + sys.exit(main())