import requests import json import time from datetime import datetime BASE_URL = "http://localhost:7891/api" print("="*80) print(" 🎯 端到端流程测试 - 任务创建与监控") print("="*80) # 1. 创建任务 print("\n[1] 创建任务...") task_title = "端到端测试:验证三国量化数据获取与分析流程" r = requests.post( f"{BASE_URL}/create-task", json={ 'title': task_title, 'org': '户部', 'official': '赵云', 'priority': 'high', 'targetDept': '户部' } ) result = r.json() print(f" 创建结果: {json.dumps(result, ensure_ascii=False, indent=2)}") if not result.get('ok'): print("\n ❌ 任务创建失败!") exit(1) task_id = result.get('taskId') print(f"\n ✅ 任务创建成功!任务ID: {task_id}") # 2. 监控任务流转 print("\n" + "="*80) print(" 🔍 开始监控任务流转...") print("="*80) max_rounds = 30 round_num = 0 last_state = None last_now = None while round_num < max_rounds: round_num += 1 current_time = datetime.now().strftime('%H:%M:%S') # 获取live-status try: r = requests.get(f"{BASE_URL}/live-status") live_data = r.json() tasks = live_data.get('tasks', []) # 找到我们的任务 our_task = None for t in tasks: if t.get('id') == task_id: our_task = t break if our_task: current_state = our_task.get('state') current_now = our_task.get('now') # 检查状态变化 if current_state != last_state or current_now != last_now: print(f"\n[{current_time}] 📢 状态变化!") print(f" 任务: {our_task.get('title')}") print(f" 状态: {current_state}") print(f" 当前: {current_now}") if current_state == 'Done': print(f"\n 🎉 任务完成!") print(f" 输出: {our_task.get('output')}") break elif current_state in ('Cancelled', 'Blocked'): print(f"\n ⚠️ 任务终止!状态: {current_state}") print(f" 阻塞原因: {our_task.get('block')}") break last_state = current_state last_now = current_now else: # 状态未变化,静默等待 print(f"[{current_time}] ⏳ 等待... (状态: {current_state})", end='\r') else: print(f"[{current_time}] ❌ 找不到任务 {task_id}!") except Exception as e: print(f"[{current_time}] ⚠️ 请求异常: {e}") time.sleep(10) print("\n" + "="*80) print(" 📊 监控结束") print("="*80)