""" 数据库连接测试脚本 """ from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker from database_config import db_settings from loguru import logger import os def test_database_connection(): """测试数据库连接""" logger.info("开始测试数据库连接...") logger.info(f"数据库类型: {db_settings.db_type}") try: # 创建数据库引擎 engine = create_engine( db_settings.get_database_url(), echo=db_settings.echo_sql, pool_size=db_settings.pool_size, max_overflow=db_settings.max_overflow, pool_timeout=db_settings.pool_timeout, pool_recycle=db_settings.pool_recycle ) logger.info("数据库引擎创建成功") # 测试连接 with engine.connect() as conn: # 执行简单的查询 if db_settings.db_type == "sqlite": result = conn.execute(text("SELECT sqlite_version()")) version = result.scalar() logger.info(f"SQLite 版本: {version}") elif db_settings.db_type == "postgresql": result = conn.execute(text("SELECT version()")) version = result.scalar() logger.info(f"PostgreSQL 版本: {version}") logger.info("数据库连接测试成功!") # 创建测试表 conn.execute(text(""" CREATE TABLE IF NOT EXISTS test_table ( id INTEGER PRIMARY KEY, name TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """)) conn.commit() logger.info("测试表创建成功") # 插入测试数据 conn.execute(text("INSERT OR REPLACE INTO test_table (id, name) VALUES (:id, :name)"), {"id": 1, "name": "测试数据"}) conn.commit() logger.info("测试数据插入成功") # 查询测试数据 result = conn.execute(text("SELECT * FROM test_table WHERE id = 1")) row = result.fetchone() logger.info(f"查询结果: {row}") # 删除测试表 conn.execute(text("DROP TABLE test_table")) conn.commit() logger.info("测试表清理完成") logger.info("✅ 数据库测试全部通过!") return True except Exception as e: logger.error(f"❌ 数据库测试失败: {str(e)}") import traceback logger.error(traceback.format_exc()) return False if __name__ == "__main__": # 配置日志 logger.add("logs/database_test_{time}.log", rotation="1 day") # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 运行测试 success = test_database_connection() # 退出码 import sys sys.exit(0 if success else 1)