package room import ( "context" "fmt" "log" "strings" "sync" "github.com/sdaduanbilei/agent-team/internal/agent" "github.com/sdaduanbilei/agent-team/internal/llm" "github.com/sdaduanbilei/agent-team/internal/skill" "github.com/sdaduanbilei/agent-team/internal/store" ) func (r *Room) runMembersParallel(ctx context.Context, assignments map[string]string, board *SharedBoard, skillXML string) map[string]string { results := make(map[string]string) var mu sync.Mutex var wg sync.WaitGroup tools := skill.ToTools(r.skillMeta) for memberName, task := range assignments { wg.Add(1) go func(name, t string) { defer wg.Done() member, ok := r.members[name] if !ok { return } r.setStatus(StatusWorking, member.Config.Name, t) r.emit(Event{Type: EvtTaskAssign, From: r.master.Config.Name, To: name, Task: t}) // 构建系统提示上下文 memberSystem, taskMsg, targetFile := r.buildMemberContext(name, t, board, tools, skillXML) // 标记已查阅 workspace 文件 wsFiles := r.listWorkspaceFiles() for i, f := range wsFiles { r.emit(Event{Type: EvtFileRead, Agent: name, Filename: f}) if i == 0 { r.setStatus(StatusWorking, name, fmt.Sprintf("正在阅读 %s ...", f)) } } memberMsgs := []llm.Message{ llm.NewMsg("system", memberSystem), llm.NewMsg("user", taskMsg), } // tool calling 循环 finalReply := r.runToolLoop(ctx, name, member, &memberMsgs, tools, &mu, results) if r.memberConvos == nil { r.memberConvos = make(map[string][]llm.Message) } r.memberConvos[name] = memberMsgs r.memberConvos[name] = append(r.memberConvos[name], llm.NewMsg("assistant", finalReply)) // Caller-Decided 输出路由 r.routeMemberOutput(ctx, name, t, member, finalReply, targetFile, board, &mu, results) // 异步保存成员记忆 go r.updateMemberMemory(context.Background(), member, t, finalReply) }(memberName, task) } wg.Wait() // 检查成员间委派 r.runSecondRound(ctx, results, board, tools, skillXML, &mu) return results } // buildMemberContext 构建成员的系统提示和任务消息 func (r *Room) buildMemberContext(name, task string, board *SharedBoard, tools []llm.Tool, skillXML string) (system, taskMsg string, targetFile *ProjectFile) { boardCtx := board.ToContext() var extraCtx string if len(tools) == 0 { extraCtx = skillXML } if boardCtx != "" { if extraCtx != "" { extraCtx = boardCtx + "\n\n" + extraCtx } else { extraCtx = boardCtx } } teamCtx := r.buildTeamXML() if teamCtx != "" { if extraCtx != "" { extraCtx = teamCtx + "\n\n" + extraCtx } else { extraCtx = teamCtx } } if r.systemRules != "" { if extraCtx != "" { extraCtx = r.systemRules + "\n\n" + extraCtx } else { extraCtx = r.systemRules } } if projectCtx := r.buildProjectContext(name); projectCtx != "" { if extraCtx != "" { extraCtx = extraCtx + "\n\n" + projectCtx } else { extraCtx = projectCtx } } member := r.members[name] system = member.BuildSystemPrompt(extraCtx) taskMsg = task log.Printf("[room %s] buildMemberContext for %s: projectTemplate=%v", r.Config.Name, name, r.projectTemplate != nil) if wsCtx := r.buildWorkspaceContext(); wsCtx != "" { taskMsg = task + "\n\n" + wsCtx } if r.memberArtifacts != nil { if _, hasDoc := r.memberArtifacts[name]; hasDoc { taskMsg += "\n\n你之前已经产出过文档。请在原文档基础上进行补充和修改,不要重新写一份全新的文档。保留原有内容中仍然有效的部分,合并新的调研结果。" } } // Caller-Decided: 预先确定目标文件 targetFile = r.findMemberTargetFile(name) if targetFile != nil { docName := strings.TrimSuffix(targetFile.Path, ".md") taskMsg += fmt.Sprintf("\n\n\n你需要产出文件《%s》。工作完成后,最终回复只输出 Markdown 正文(以 # %s 开头),不要包含交流内容。\n", targetFile.Path, docName, docName) } return } // runToolLoop 执行 tool calling 循环,返回最终回复 func (r *Room) runToolLoop(ctx context.Context, name string, member *agent.Agent, memberMsgs *[]llm.Message, tools []llm.Tool, mu *sync.Mutex, results map[string]string) string { var finalReply string for round := 0; round < 10; round++ { result, err := member.ChatWithTools(ctx, *memberMsgs, tools, nil) if err != nil { mu.Lock() results[name] = fmt.Sprintf("[error] %v", err) mu.Unlock() return "" } r.emitUsage(name, result.Usage) if len(result.ToolCalls) == 0 { finalReply = result.Content break } assistantMsg := llm.Message{ Role: "assistant", Content: result.Content, ToolCalls: result.ToolCalls, } *memberMsgs = append(*memberMsgs, assistantMsg) var toolNames []string for _, tc := range result.ToolCalls { toolNames = append(toolNames, tc.Function.Name) } r.emit(Event{Type: EvtAgentMessage, Agent: name, Role: "tool_use", Content: strings.Join(toolNames, ", "), Streaming: true}) for _, tc := range result.ToolCalls { toolResult := r.executeToolCall(tc) *memberMsgs = append(*memberMsgs, llm.NewToolResultMsg(tc.ID, toolResult)) } r.emit(Event{Type: EvtAgentMessage, Agent: name, Role: "tool_use", Content: "thinking", Streaming: true}) } // 关闭 tool_use 状态 r.emit(Event{Type: EvtAgentMessage, Agent: name, Role: "tool_use", Content: "", Streaming: false}) // 强制无 tools 生成总结 if finalReply == "" { *memberMsgs = append(*memberMsgs, llm.NewMsg("user", "请根据以上所有工具调用结果,直接输出完整的任务回复。不要再调用任何工具。")) result, err := member.ChatWithTools(ctx, *memberMsgs, nil, nil) if err == nil && result.Content != "" { finalReply = result.Content r.emitUsage(name, result.Usage) } } if finalReply == "" { finalReply = "[任务执行完成,但未产生文本回复]" } return finalReply } // routeMemberOutput 处理成员输出路由(file call 或 chat call) func (r *Room) routeMemberOutput(ctx context.Context, name, task string, member *agent.Agent, finalReply string, targetFile *ProjectFile, board *SharedBoard, mu *sync.Mutex, results map[string]string) { if targetFile != nil { log.Printf("[room %s] FILE CALL for %s → %s (%d chars)", r.Config.Name, name, targetFile.Path, len(finalReply)) // FILE CALL: 直接保存到已知目标文件 content := strings.TrimSpace(finalReply) if !strings.HasPrefix(content, "# ") { content = "# " + strings.TrimSuffix(targetFile.Path, ".md") + "\n\n" + content } r.saveWorkspace(targetFile.Path, content) docName := strings.TrimSuffix(targetFile.Path, ".md") r.emit(Event{Type: EvtArtifact, Agent: name, Filename: targetFile.Path, Title: docName}) r.emit(Event{Type: EvtTaskDone, Agent: name, Task: task}) if r.memberArtifacts == nil { r.memberArtifacts = make(map[string]string) } r.memberArtifacts[name] = targetFile.Path if r.Store != nil { r.Store.InsertMessage(&store.Message{ RoomID: r.Config.Name, Agent: name, Role: "member", Content: docName, Filename: targetFile.Path, PartType: "document", GroupID: &r.currentGroupID, }) } // 文档已保存,发送静态完成消息到聊天 statusMsg := fmt.Sprintf("《%s》已完成,@master 请查阅。", docName) r.emit(Event{Type: EvtAgentMessage, Agent: name, Role: "member", Content: statusMsg, NoStore: true}) if r.Store != nil { r.Store.InsertMessage(&store.Message{ RoomID: r.Config.Name, Agent: name, Role: "member", Content: statusMsg, PartType: "text", GroupID: &r.currentGroupID, }) } mu.Lock() results[name] = statusMsg mu.Unlock() r.AppendHistory("member", name, finalReply) board.Add(name, statusMsg, "draft") } else { // CHAT CALL: 走旧路由(targetFile 为 nil) log.Printf("[room %s] CHAT CALL for %s (no targetFile, %d chars)", r.Config.Name, name, len(finalReply)) savedPath, routed := r.saveAgentOutput(name, finalReply, task) if routed { if r.Store != nil { title := extractTitle(finalReply) r.Store.InsertMessage(&store.Message{ RoomID: r.Config.Name, Agent: name, Role: "member", Content: title, Filename: savedPath, PartType: "document", GroupID: &r.currentGroupID, }) } docTitle := extractTitle(finalReply) statusMsg := fmt.Sprintf("《%s》已完成,@master 请查阅。", docTitle) r.emit(Event{Type: EvtAgentMessage, Agent: name, Role: "member", Content: statusMsg, NoStore: true}) if r.Store != nil { r.Store.InsertMessage(&store.Message{ RoomID: r.Config.Name, Agent: name, Role: "member", Content: statusMsg, PartType: "text", GroupID: &r.currentGroupID, }) } mu.Lock() results[name] = statusMsg mu.Unlock() r.AppendHistory("member", name, finalReply) board.Add(name, statusMsg, "draft") } else { mu.Lock() results[name] = finalReply mu.Unlock() r.AppendHistory("member", name, finalReply) if strings.TrimSpace(finalReply) != "" { board.Add(name, finalReply, "draft") } } } } // runSecondRound 检查成员间委派,自动分派第二轮 func (r *Room) runSecondRound(ctx context.Context, results map[string]string, board *SharedBoard, tools []llm.Tool, skillXML string, mu *sync.Mutex) { secondRound := make(map[string]string) for _, result := range results { subAssignments := parseAssignments(result) for name, task := range subAssignments { if _, isMember := r.members[name]; isMember { if _, alreadyDone := results[name]; !alreadyDone { secondRound[name] = task } } } } if len(secondRound) == 0 { return } log.Printf("[room %s] 检测到成员间委派,执行第二轮: %v", r.Config.Name, secondRound) var wg sync.WaitGroup for memberName, task := range secondRound { wg.Add(1) go func(name, t string) { defer wg.Done() member, ok := r.members[name] if !ok { return } r.setStatus(StatusWorking, member.Config.Name, t) r.emit(Event{Type: EvtTaskAssign, From: "team", To: name, Task: t}) memberSystem, taskMsg, _ := r.buildMemberContext(name, t, board, tools, skillXML) wsFiles := r.listWorkspaceFiles() for i, f := range wsFiles { r.emit(Event{Type: EvtFileRead, Agent: name, Filename: f}) if i == 0 { r.setStatus(StatusWorking, name, fmt.Sprintf("正在阅读 %s ...", f)) } } memberMsgs := []llm.Message{ llm.NewMsg("system", memberSystem), llm.NewMsg("user", taskMsg), } finalReply := r.runToolLoop(ctx, name, member, &memberMsgs, tools, mu, results) statusMsg, routed := r.saveAgentOutput(name, finalReply, t) if routed { mu.Lock() results[name] = statusMsg mu.Unlock() r.AppendHistory("member", name, finalReply) if strings.TrimSpace(statusMsg) != "" { board.Add(name, statusMsg, "draft") } } else { mu.Lock() results[name] = finalReply mu.Unlock() r.AppendHistory("member", name, finalReply) if strings.TrimSpace(finalReply) != "" { board.Add(name, finalReply, "draft") } } go r.updateMemberMemory(context.Background(), member, t, finalReply) }(memberName, task) } wg.Wait() } func (r *Room) runChallengeRound(ctx context.Context, board *SharedBoard, skillXML string) { var challengers []string for name, member := range r.members { if member.Config.CanChallenge { challengers = append(challengers, name) } } if len(challengers) == 0 { return } boardCtx := board.ToContext() if boardCtx == "" && len(r.listWorkspaceFiles()) == 0 { return } var wg sync.WaitGroup for _, name := range challengers { wg.Add(1) go func(n string) { defer wg.Done() member := r.members[n] extraCtx := boardCtx + "\n\n" + skillXML if wsCtx := r.buildWorkspaceContext(); wsCtx != "" { extraCtx = wsCtx + "\n\n" + extraCtx } teamCtx := r.buildTeamXML() if teamCtx != "" { extraCtx = teamCtx + "\n\n" + extraCtx } if r.systemRules != "" { extraCtx = r.systemRules + "\n\n" + extraCtx } if projectCtx := r.buildProjectContext(n); projectCtx != "" { extraCtx = extraCtx + "\n\n" + projectCtx } memberSystem := member.BuildSystemPrompt(extraCtx) memberMsgs := []llm.Message{ llm.NewMsg("system", memberSystem+"\n\n审阅 workspace 中的文档内容(而非看板摘要)。如果发现问题或需要质疑,请输出 CHALLENGE:你的具体意见。如果没有问题,输出 AGREE。注意:只评审你职责范围内的内容。禁止使用@提及任何人,禁止建议分配任务。"), llm.NewMsg("user", "请审阅 workspace 中的文档并给出你的专业反馈。"), } var reply strings.Builder _, usage, err := member.ChatWithUsage(ctx, memberMsgs, func(token string) { reply.WriteString(token) r.emit(Event{Type: EvtAgentMessage, Agent: n, Role: "challenge", Content: token, Streaming: true}) }) if err != nil { return } r.emitUsage(n, usage) r.emit(Event{Type: EvtAgentMessage, Agent: n, Role: "challenge", Content: "", Streaming: false, NoStore: true}) result := reply.String() if r.Store != nil && result != "" { r.Store.InsertMessage(&store.Message{ RoomID: r.Config.Name, Agent: n, Role: "challenge", Content: result, PartType: "text", GroupID: &r.currentGroupID, }) } if strings.Contains(result, "CHALLENGE:") { board.Add(n, result, "challenge") r.AppendHistory("challenge", n, result) } }(name) } wg.Wait() }