Files
sanguo_quant_live/agent_runner.py
T

204 lines
6.9 KiB
Python

#!/usr/bin/env python3
"""
赵云Agent启动脚本
监听新任务,自动执行并报告
"""
import os
import sys
import time
import json
from datetime import datetime
from typing import Optional, Dict, Any
import subprocess
import signal
import atexit
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class AgentRunner:
"""Agent运行器"""
def __init__(self, agent_name: str = "zhaoyun"):
self.agent_name = agent_name
self.task_dir = "./management/tasks/pending/"
self.agent_dir = f"./management/agents/{agent_name}/"
# 确保目录存在
os.makedirs(self.task_dir, exist_ok=True)
os.makedirs(self.agent_dir, exist_ok=True)
self.running = False
logger.info(f"Agent {agent_name} 初始化完成")
def check_for_new_task(self) -> Optional[Dict[str, Any]]:
"""检查是否有新任务"""
try:
files = os.listdir(self.task_dir)
for file in sorted(files):
if file.endswith('.json') and not file.startswith('.'):
task_file = os.path.join(self.task_dir, file)
with open(task_file, 'r', encoding='utf-8') as f:
task = json.load(f)
# 检查任务是否已经分配给我们
if task.get('assigned_to') == self.agent_name:
# 执行任务
task_result = self.execute_task(task)
# 移动任务到完成目录
done_dir = "./management/tasks/done/"
os.makedirs(done_dir, exist_ok=True)
done_file = os.path.join(done_dir, file)
os.rename(task_file, done_file)
logger.info(f"任务 {file} 执行完成")
return task_result
return None
except Exception as e:
logger.error(f"检查新任务失败: {e}")
return None
def execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""执行任务"""
try:
task_id = task.get('task_id', 'unknown')
logger.info(f"开始执行任务: {task_id}")
# 根据任务类型执行
task_type = task.get('type', 'unknown')
if task_type == 'data_download':
return self.execute_data_download_task(task)
elif task_type == 'research':
return self.execute_research_task(task)
elif task_type == 'analysis':
return self.execute_analysis_task(task)
else:
logger.warning(f"未知任务类型: {task_type}")
return {"status": "unknown_task", "task_id": task_id}
except Exception as e:
logger.error(f"执行任务失败: {e}")
return {"status": "error", "error": str(e)}
def execute_data_download_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""执行数据下载任务"""
try:
task_desc = task.get('description', '')
logger.info(f"执行数据下载任务: {task_desc}")
# 简化版执行
result = {
"status": "completed",
"agent": self.agent_name,
"task_id": task.get('task_id', 'unknown'),
"executed_at": datetime.now().isoformat(),
"execution_time": time.time(),
"output": {
"message": f"Agent {self.agent_name} 成功执行数据下载任务",
"task_type": "data_download",
"timestamp": datetime.now().isoformat()
}
}
logger.info(f"数据下载任务执行完成")
return result
def execute_research_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""执行调研任务"""
try:
task_desc = task.get('description', '')
logger.info(f"执行调研任务: {task_desc}")
result = {
"status": "completed",
"agent": self.agent_name,
"task_id": task.get('task_id', 'unknown'),
"executed_at": datetime.now().isoformat(),
"execution_time": time.time(),
"output": {
"message": f"Agent {self.agent_name} 成功执行调研任务",
"task_type": "research",
"timestamp": datetime.now().isoformat()
}
}
logger.info(f"调研任务执行完成")
return result
def execute_analysis_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""执行分析任务"""
try:
task_desc = task.get('description', '')
logger.info(f"执行分析任务: {task_desc}")
result = {
"status": "completed",
"agent": self.agent_name,
"task_id": task.get('task_id', 'unknown'),
"executed_at": datetime.now().isoformat(),
"execution_time": time.time(),
"output": {
"message": f"Agent {self.agent_name} 成功执行分析任务",
"task_type": "analysis",
"timestamp": datetime.now().isoformat()
}
}
logger.info(f"分析任务执行完成")
return result
def run(self):
"""运行Agent"""
logger.info(f"Agent {self.agent_name} 启动运行...")
self.running = True
try:
while self.running:
task = self.check_for_new_task()
if task:
logger.info(f"发现新任务,正在执行...")
# 添加延迟,避免过于频繁的检查
time.sleep(30)
except KeyboardInterrupt:
logger.info(f"Agent {self.agent_name} 收到停止信号")
except Exception as e:
logger.error(f"Agent运行异常: {e}")
self.running = False
logger.info(f"Agent {self.agent_name} 停止运行.")
def stop(self):
"""停止Agent"""
self.running = False
logger.info(f"Agent {self.agent_name} 正在停止...")
def main():
"""主函数"""
runner = AgentRunner()
# 注册退出处理
atexit.register(runner.stop)
try:
runner.run()
except Exception as e:
logger.error(f"Agent运行失败: {e}")
sys.exit(1)
if __name__ == "__main__":
main()