agent-team/internal/room/members.go

428 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package room
import (
"context"
"fmt"
"log"
"strings"
"sync"
"github.com/sdaduanbilei/agent-team/internal/agent"
"github.com/sdaduanbilei/agent-team/internal/llm"
"github.com/sdaduanbilei/agent-team/internal/skill"
"github.com/sdaduanbilei/agent-team/internal/store"
)
func (r *Room) runMembersParallel(ctx context.Context, assignments map[string]string, board *SharedBoard, skillXML string) map[string]string {
results := make(map[string]string)
var mu sync.Mutex
var wg sync.WaitGroup
tools := skill.ToTools(r.skillMeta)
for memberName, task := range assignments {
wg.Add(1)
go func(name, t string) {
defer wg.Done()
member, ok := r.members[name]
if !ok {
return
}
r.setStatus(StatusWorking, member.Config.Name, t)
r.emit(Event{Type: EvtTaskAssign, From: r.master.Config.Name, To: name, Task: t})
// 构建系统提示上下文
memberSystem, taskMsg, targetFile := r.buildMemberContext(name, t, board, tools, skillXML)
// 标记已查阅 workspace 文件
wsFiles := r.listWorkspaceFiles()
for i, f := range wsFiles {
r.emit(Event{Type: EvtFileRead, Agent: name, Filename: f})
if i == 0 {
r.setStatus(StatusWorking, name, fmt.Sprintf("正在阅读 %s ...", f))
}
}
memberMsgs := []llm.Message{
llm.NewMsg("system", memberSystem),
llm.NewMsg("user", taskMsg),
}
var finalReply string
if targetFile != nil {
// FILE CALL: 静默调用,不流式输出到聊天
docName := strings.TrimSuffix(targetFile.Path, ".md")
r.emit(Event{Type: EvtFileWorking, Agent: name, Filename: targetFile.Path, Title: docName})
finalReply = r.runToolLoop(ctx, name, member, &memberMsgs, tools, &mu, results)
} else if dynDir, dynOwner, _ := r.getDynamicFileInfo(); dynDir != "" && dynOwner == name && r.allStaticFilesDone() {
// 动态章节写作:也是静默文档调用
r.emit(Event{Type: EvtFileWorking, Agent: name, Filename: "章节", Title: "章节"})
finalReply = r.runToolLoop(ctx, name, member, &memberMsgs, tools, &mu, results)
} else {
// CHAT CALL: 保持现有流式输出逻辑
finalReply = r.runToolLoop(ctx, name, member, &memberMsgs, tools, &mu, results)
}
if r.memberConvos == nil {
r.memberConvos = make(map[string][]llm.Message)
}
r.memberConvos[name] = memberMsgs
r.memberConvos[name] = append(r.memberConvos[name], llm.NewMsg("assistant", finalReply))
// Caller-Decided 输出路由
r.routeMemberOutput(ctx, name, t, member, finalReply, targetFile, board, &mu, results)
// 异步保存成员记忆
go r.updateMemberMemory(context.Background(), member, t, finalReply)
}(memberName, task)
}
wg.Wait()
return results
}
// buildMemberContext 构建成员的系统提示和任务消息
func (r *Room) buildMemberContext(name, task string, board *SharedBoard, tools []llm.Tool, skillXML string) (system, taskMsg string, targetFile *ProjectFile) {
boardCtx := board.ToContext()
var extraCtx string
if len(tools) == 0 {
extraCtx = skillXML
}
if boardCtx != "" {
if extraCtx != "" {
extraCtx = boardCtx + "\n\n" + extraCtx
} else {
extraCtx = boardCtx
}
}
teamCtx := r.buildTeamXML()
if teamCtx != "" {
if extraCtx != "" {
extraCtx = teamCtx + "\n\n" + extraCtx
} else {
extraCtx = teamCtx
}
}
if r.systemRules != "" {
if extraCtx != "" {
extraCtx = r.systemRules + "\n\n" + extraCtx
} else {
extraCtx = r.systemRules
}
}
if projectCtx := r.buildProjectContext(name); projectCtx != "" {
if extraCtx != "" {
extraCtx = extraCtx + "\n\n" + projectCtx
} else {
extraCtx = projectCtx
}
}
member := r.members[name]
system = member.BuildSystemPrompt(extraCtx)
taskMsg = task
log.Printf("[room %s] buildMemberContext for %s: projectTemplate=%v", r.Config.Name, name, r.projectTemplate != nil)
if wsCtx := r.buildWorkspaceContext(); wsCtx != "" {
taskMsg = task + "\n\n" + wsCtx
}
if r.memberArtifacts != nil {
if _, hasDoc := r.memberArtifacts[name]; hasDoc {
taskMsg += "\n\n" + r.Prompt.R("member_update_doc")
}
}
// 优先从任务描述中解析目标文件(支持修订已有文件)
targetFile = r.parseTargetFileFromTask(name, task)
if targetFile == nil {
// Caller-Decided: 预先确定目标文件
targetFile = r.findMemberTargetFile(name)
}
if targetFile != nil {
docName := strings.TrimSuffix(targetFile.Path, ".md")
taskMsg += "\n\n" + r.Prompt.Render("file_call_member", map[string]string{
"FilePath": targetFile.Path,
"DocName": docName,
})
}
return
}
// runToolLoop 执行 tool calling 循环,返回最终回复
func (r *Room) runToolLoop(ctx context.Context, name string, member *agent.Agent, memberMsgs *[]llm.Message, tools []llm.Tool, mu *sync.Mutex, results map[string]string) string {
var finalReply string
for round := 0; round < 10; round++ {
result, err := member.ChatWithTools(ctx, *memberMsgs, tools, nil)
if err != nil {
mu.Lock()
results[name] = fmt.Sprintf("[error] %v", err)
mu.Unlock()
return ""
}
r.emitUsage(name, result.Usage)
if len(result.ToolCalls) == 0 {
finalReply = result.Content
break
}
assistantMsg := llm.Message{
Role: "assistant",
Content: result.Content,
ToolCalls: result.ToolCalls,
}
*memberMsgs = append(*memberMsgs, assistantMsg)
var toolNames []string
for _, tc := range result.ToolCalls {
toolNames = append(toolNames, tc.Function.Name)
}
r.emit(Event{Type: EvtAgentMessage, Agent: name, Role: "tool_use",
Content: strings.Join(toolNames, ", "), Streaming: true})
for _, tc := range result.ToolCalls {
toolResult := r.executeToolCall(tc)
*memberMsgs = append(*memberMsgs, llm.NewToolResultMsg(tc.ID, toolResult))
}
r.emit(Event{Type: EvtAgentMessage, Agent: name, Role: "tool_use",
Content: "thinking", Streaming: true})
}
// 关闭 tool_use 状态
r.emit(Event{Type: EvtAgentMessage, Agent: name, Role: "tool_use",
Content: "", Streaming: false})
// 强制无 tools 生成总结
if finalReply == "" {
*memberMsgs = append(*memberMsgs, llm.NewMsg("user", r.Prompt.R("tool_loop_summarize")))
result, err := member.ChatWithTools(ctx, *memberMsgs, nil, nil)
if err == nil && result.Content != "" {
finalReply = result.Content
r.emitUsage(name, result.Usage)
}
}
if finalReply == "" {
finalReply = "[任务执行完成,但未产生文本回复]"
}
return finalReply
}
// routeMemberOutput 处理成员输出路由file call 或 chat call
func (r *Room) routeMemberOutput(ctx context.Context, name, task string, member *agent.Agent, finalReply string, targetFile *ProjectFile, board *SharedBoard, mu *sync.Mutex, results map[string]string) {
if targetFile != nil {
log.Printf("[room %s] FILE CALL for %s → %s (%d chars)", r.Config.Name, name, targetFile.Path, len(finalReply))
// FILE CALL: 直接保存到已知目标文件
content := strings.TrimSpace(finalReply)
if !strings.HasPrefix(content, "# ") {
content = "# " + strings.TrimSuffix(targetFile.Path, ".md") + "\n\n" + content
}
r.saveWorkspace(targetFile.Path, content)
docName := strings.TrimSuffix(targetFile.Path, ".md")
r.emit(Event{Type: EvtArtifact, Agent: name, Filename: targetFile.Path, Title: docName})
r.emit(Event{Type: EvtFileDone, Agent: name, Filename: targetFile.Path, Title: docName})
r.emit(Event{Type: EvtTaskDone, Agent: name, Task: task})
if r.memberArtifacts == nil {
r.memberArtifacts = make(map[string]string)
}
r.memberArtifacts[name] = targetFile.Path
if r.Store != nil {
r.Store.InsertMessage(&store.Message{
RoomID: r.Config.Name, Agent: name, Role: "member",
Content: docName, Filename: targetFile.Path, PartType: "document",
GroupID: &r.currentGroupID,
})
}
// 文档已保存,发送静态完成消息到聊天
statusMsg := fmt.Sprintf("《%s》已完成@master 请查阅。", docName)
r.emit(Event{Type: EvtAgentMessage, Agent: name, Role: "member", Content: statusMsg, NoStore: true})
if r.Store != nil {
r.Store.InsertMessage(&store.Message{
RoomID: r.Config.Name, Agent: name, Role: "member",
Content: statusMsg, PartType: "text",
GroupID: &r.currentGroupID,
})
}
mu.Lock()
results[name] = statusMsg
mu.Unlock()
r.AppendHistory("member", name, finalReply)
board.Add(name, statusMsg, "draft")
} else {
// CHAT CALL: 走旧路由targetFile 为 nil
log.Printf("[room %s] CHAT CALL for %s (no targetFile, %d chars)", r.Config.Name, name, len(finalReply))
savedPath, routed := r.saveAgentOutput(name, finalReply, task)
if routed {
if r.Store != nil {
title := extractTitle(finalReply)
r.Store.InsertMessage(&store.Message{
RoomID: r.Config.Name, Agent: name, Role: "member",
Content: title, Filename: savedPath, PartType: "document",
GroupID: &r.currentGroupID,
})
}
docTitle := extractTitle(finalReply)
statusMsg := fmt.Sprintf("《%s》已完成@master 请查阅。", docTitle)
r.emit(Event{Type: EvtAgentMessage, Agent: name, Role: "member", Content: statusMsg, NoStore: true})
if r.Store != nil {
r.Store.InsertMessage(&store.Message{
RoomID: r.Config.Name, Agent: name, Role: "member",
Content: statusMsg, PartType: "text",
GroupID: &r.currentGroupID,
})
}
mu.Lock()
results[name] = statusMsg
mu.Unlock()
r.AppendHistory("member", name, finalReply)
board.Add(name, statusMsg, "draft")
} else {
mu.Lock()
results[name] = finalReply
mu.Unlock()
r.AppendHistory("member", name, finalReply)
if strings.TrimSpace(finalReply) != "" {
board.Add(name, finalReply, "draft")
}
}
}
}
// runSecondRound 检查成员间委派,自动分派第二轮
func (r *Room) runSecondRound(ctx context.Context, results map[string]string, board *SharedBoard, tools []llm.Tool, skillXML string, mu *sync.Mutex) {
secondRound := make(map[string]string)
for _, result := range results {
subAssignments := parseAssignments(result)
for name, task := range subAssignments {
if _, isMember := r.members[name]; isMember {
if _, alreadyDone := results[name]; !alreadyDone {
secondRound[name] = task
}
}
}
}
if len(secondRound) == 0 {
return
}
log.Printf("[room %s] 检测到成员间委派,执行第二轮: %v", r.Config.Name, secondRound)
var wg sync.WaitGroup
for memberName, task := range secondRound {
wg.Add(1)
go func(name, t string) {
defer wg.Done()
member, ok := r.members[name]
if !ok {
return
}
r.setStatus(StatusWorking, member.Config.Name, t)
r.emit(Event{Type: EvtTaskAssign, From: "team", To: name, Task: t})
memberSystem, taskMsg, _ := r.buildMemberContext(name, t, board, tools, skillXML)
wsFiles := r.listWorkspaceFiles()
for i, f := range wsFiles {
r.emit(Event{Type: EvtFileRead, Agent: name, Filename: f})
if i == 0 {
r.setStatus(StatusWorking, name, fmt.Sprintf("正在阅读 %s ...", f))
}
}
memberMsgs := []llm.Message{
llm.NewMsg("system", memberSystem),
llm.NewMsg("user", taskMsg),
}
finalReply := r.runToolLoop(ctx, name, member, &memberMsgs, tools, mu, results)
statusMsg, routed := r.saveAgentOutput(name, finalReply, t)
if routed {
mu.Lock()
results[name] = statusMsg
mu.Unlock()
r.AppendHistory("member", name, finalReply)
if strings.TrimSpace(statusMsg) != "" {
board.Add(name, statusMsg, "draft")
}
} else {
mu.Lock()
results[name] = finalReply
mu.Unlock()
r.AppendHistory("member", name, finalReply)
if strings.TrimSpace(finalReply) != "" {
board.Add(name, finalReply, "draft")
}
}
go r.updateMemberMemory(context.Background(), member, t, finalReply)
}(memberName, task)
}
wg.Wait()
}
func (r *Room) runChallengeRound(ctx context.Context, board *SharedBoard, skillXML string) {
var challengers []string
for name, member := range r.members {
if member.Config.CanChallenge {
challengers = append(challengers, name)
}
}
if len(challengers) == 0 {
return
}
boardCtx := board.ToContext()
if boardCtx == "" && len(r.listWorkspaceFiles()) == 0 {
return
}
var wg sync.WaitGroup
for _, name := range challengers {
wg.Add(1)
go func(n string) {
defer wg.Done()
member := r.members[n]
extraCtx := boardCtx + "\n\n" + skillXML
if wsCtx := r.buildWorkspaceContext(); wsCtx != "" {
extraCtx = wsCtx + "\n\n" + extraCtx
}
teamCtx := r.buildTeamXML()
if teamCtx != "" {
extraCtx = teamCtx + "\n\n" + extraCtx
}
if r.systemRules != "" {
extraCtx = r.systemRules + "\n\n" + extraCtx
}
if projectCtx := r.buildProjectContext(n); projectCtx != "" {
extraCtx = extraCtx + "\n\n" + projectCtx
}
memberSystem := member.BuildSystemPrompt(extraCtx)
memberMsgs := []llm.Message{
llm.NewMsg("system", memberSystem+"\n\n"+r.Prompt.R("challenge_review")),
llm.NewMsg("user", "请审阅 workspace 中的文档并给出你的专业反馈。"),
}
var reply strings.Builder
_, usage, err := member.ChatWithUsage(ctx, memberMsgs, func(token string) {
reply.WriteString(token)
r.emit(Event{Type: EvtAgentMessage, Agent: n, Role: "challenge", Content: token, Streaming: true})
})
if err != nil {
return
}
r.emitUsage(n, usage)
r.emit(Event{Type: EvtAgentMessage, Agent: n, Role: "challenge", Content: "", Streaming: false, NoStore: true})
result := reply.String()
if r.Store != nil && result != "" {
r.Store.InsertMessage(&store.Message{
RoomID: r.Config.Name, Agent: n, Role: "challenge",
Content: result, PartType: "text",
GroupID: &r.currentGroupID,
})
}
if strings.Contains(result, "CHALLENGE:") {
board.Add(n, result, "challenge")
r.AppendHistory("challenge", n, result)
}
}(name)
}
wg.Wait()
}