scorpio d6df056687 feat: Part 模型 + 文件版本追踪 + 写手团队工作流 v2
- 数据层:messages 表增加 part_type 字段,新建 file_versions 表支持版本追踪
- 后端:saveWorkspace 版本追踪、saveAgentOutput 源头分离、generateBriefMessage 成员简报
- 后端:applyDocumentEdit 增量编辑、buildWorkflowStep phase-aware 工作流引擎
- API:文件版本查询/回退接口
- 前端:part_type 驱动渲染,产物面板版本历史
- 新增写手团队(主编/搜索员/策划编辑/合规审查员)配置
- store 模块、scheduler 模块、web-search skill

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 18:44:34 +08:00

569 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package scheduler
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)
// Schedule 定时任务定义
type Schedule struct {
ID string `json:"id"`
RoomID string `json:"room_id"`
Cron string `json:"cron"` // cron 表达式,如 "0 8 * * *";单次任务用 "once YYYY-MM-DD HH:MM"
Message string `json:"message"` // 注入的消息内容
UserName string `json:"user_name"` // 虚拟用户名,默认 "scheduler"
Enabled bool `json:"enabled"`
Once bool `json:"once"` // 是否为单次任务
CreatedAt string `json:"created_at"`
LastRunAt string `json:"last_run_at"`
}
// RoomHandler 房间接口,供 scheduler 调用
type RoomHandler interface {
HandleUserMessage(ctx context.Context, userName, msg string) error
}
// BroadcastFunc 事件广播回调
type BroadcastFunc func(roomID string, eventType, message, userName string)
// ScheduleStore 定时任务存储接口
type ScheduleStore interface {
GetAllEnabled() ([]StoreSchedule, error)
UpdateLastRun(id, lastRunAt string) error
DisableSchedule(id string) error
}
// StoreSchedule 从 store 包导入的 Schedule 结构
type StoreSchedule struct {
ID string
RoomID string
Cron string
Message string
UserName string
Enabled bool
Once bool
CreatedAt string
LastRunAt string
}
// Scheduler 定时任务调度器
type Scheduler struct {
rooms map[string]RoomHandler
roomsDir string
mu sync.RWMutex
cancel context.CancelFunc
Broadcast BroadcastFunc
Store ScheduleStore
}
// New 创建调度器
func New(roomsDir string) *Scheduler {
return &Scheduler{
rooms: make(map[string]RoomHandler),
roomsDir: roomsDir,
}
}
// SetRooms 注入 room 引用
func (s *Scheduler) SetRooms(rooms map[string]RoomHandler) {
s.mu.Lock()
defer s.mu.Unlock()
s.rooms = rooms
}
// Start 启动调度器,每 30 秒检查一次
func (s *Scheduler) Start(ctx context.Context) {
ctx, s.cancel = context.WithCancel(ctx)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
log.Println("[scheduler] 调度器已启动,每 30 秒检查一次")
// 启动时立即检查一次
s.tick(ctx)
for {
select {
case <-ctx.Done():
log.Println("[scheduler] 调度器已停止")
return
case <-ticker.C:
s.tick(ctx)
}
}
}
// Stop 优雅关闭
func (s *Scheduler) Stop() {
if s.cancel != nil {
s.cancel()
}
}
// tick 检查所有房间的定时任务
func (s *Scheduler) tick(ctx context.Context) {
now := time.Now()
// 优先使用 Store
if s.Store != nil {
s.tickFromStore(ctx, now)
return
}
entries, err := os.ReadDir(s.roomsDir)
if err != nil {
return
}
for _, e := range entries {
if !e.IsDir() {
continue
}
roomID := e.Name()
schedules, err := s.LoadSchedules(roomID)
if err != nil || len(schedules) == 0 {
continue
}
changed := false
for i := range schedules {
sch := &schedules[i]
if !sch.Enabled {
continue
}
matched := false
if sch.Once {
matched = matchOnce(sch.Cron, now)
} else {
expr, err := ParseCron(sch.Cron)
if err != nil {
continue
}
matched = expr.Match(now)
}
if !matched {
continue
}
if sameMinute(sch.LastRunAt, now) {
continue
}
s.mu.RLock()
room := s.rooms[roomID]
s.mu.RUnlock()
if room == nil {
continue
}
userName := sch.UserName
if userName == "" {
userName = "scheduler"
}
log.Printf("[scheduler] 触发定时任务: room=%s, schedule=%s, message=%s", roomID, sch.ID, sch.Message)
if s.Broadcast != nil {
s.Broadcast(roomID, "schedule_run", sch.Message, userName)
}
go func(r RoomHandler, un, msg string) {
if err := r.HandleUserMessage(ctx, un, msg); err != nil {
log.Printf("[scheduler] 执行失败: %v", err)
}
}(room, userName, sch.Message)
sch.LastRunAt = now.Format(time.RFC3339)
changed = true
if sch.Once {
sch.Enabled = false
log.Printf("[scheduler] 单次任务已完成并禁用: %s", sch.ID)
}
}
if changed {
s.SaveSchedules(roomID, schedules)
}
}
}
func (s *Scheduler) tickFromStore(ctx context.Context, now time.Time) {
all, err := s.Store.GetAllEnabled()
if err != nil || len(all) == 0 {
return
}
for _, sch := range all {
matched := false
if sch.Once {
matched = matchOnce(sch.Cron, now)
} else {
expr, err := ParseCron(sch.Cron)
if err != nil {
continue
}
matched = expr.Match(now)
}
if !matched {
continue
}
if sameMinute(sch.LastRunAt, now) {
continue
}
s.mu.RLock()
room := s.rooms[sch.RoomID]
s.mu.RUnlock()
if room == nil {
continue
}
userName := sch.UserName
if userName == "" {
userName = "scheduler"
}
log.Printf("[scheduler] 触发定时任务: room=%s, schedule=%s, message=%s", sch.RoomID, sch.ID, sch.Message)
if s.Broadcast != nil {
s.Broadcast(sch.RoomID, "schedule_run", sch.Message, userName)
}
go func(r RoomHandler, un, msg string) {
if err := r.HandleUserMessage(ctx, un, msg); err != nil {
log.Printf("[scheduler] 执行失败: %v", err)
}
}(room, userName, sch.Message)
s.Store.UpdateLastRun(sch.ID, now.Format(time.RFC3339))
if sch.Once {
s.Store.DisableSchedule(sch.ID)
log.Printf("[scheduler] 单次任务已完成并禁用: %s", sch.ID)
}
}
}
func sameMinute(lastRunAt string, now time.Time) bool {
if lastRunAt == "" {
return false
}
lastRun, err := time.Parse(time.RFC3339, lastRunAt)
if err != nil {
return false
}
return lastRun.Year() == now.Year() && lastRun.Month() == now.Month() &&
lastRun.Day() == now.Day() && lastRun.Hour() == now.Hour() &&
lastRun.Minute() == now.Minute()
}
// matchOnce 匹配单次任务时间,格式 "2006-01-02 15:04"
func matchOnce(cronExpr string, now time.Time) bool {
t, err := time.ParseInLocation("2006-01-02 15:04", cronExpr, now.Location())
if err != nil {
return false
}
return t.Year() == now.Year() && t.Month() == now.Month() &&
t.Day() == now.Day() && t.Hour() == now.Hour() &&
t.Minute() == now.Minute()
}
// --- 存储 ---
func (s *Scheduler) schedulesFile(roomID string) string {
return filepath.Join(s.roomsDir, roomID, "schedules.json")
}
// LoadSchedules 从文件加载定时任务
func (s *Scheduler) LoadSchedules(roomID string) ([]Schedule, error) {
data, err := os.ReadFile(s.schedulesFile(roomID))
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
var schedules []Schedule
if err := json.Unmarshal(data, &schedules); err != nil {
return nil, err
}
return schedules, nil
}
// SaveSchedules 保存定时任务到文件
func (s *Scheduler) SaveSchedules(roomID string, schedules []Schedule) error {
data, err := json.MarshalIndent(schedules, "", " ")
if err != nil {
return err
}
return os.WriteFile(s.schedulesFile(roomID), data, 0644)
}
// AddSchedule 添加定时任务
func (s *Scheduler) AddSchedule(roomID string, sch Schedule) error {
schedules, _ := s.LoadSchedules(roomID)
sch.CreatedAt = time.Now().Format(time.RFC3339)
if sch.UserName == "" {
sch.UserName = "scheduler"
}
// 验证 cron 表达式
if !sch.Once {
if _, err := ParseCron(sch.Cron); err != nil {
return fmt.Errorf("无效的 cron 表达式: %w", err)
}
} else {
if _, err := time.ParseInLocation("2006-01-02 15:04", sch.Cron, time.Now().Location()); err != nil {
return fmt.Errorf("无效的单次时间格式,应为 YYYY-MM-DD HH:MM: %w", err)
}
}
schedules = append(schedules, sch)
return s.SaveSchedules(roomID, schedules)
}
// UpdateSchedule 更新定时任务
func (s *Scheduler) UpdateSchedule(roomID, scheduleID string, updated Schedule) error {
schedules, err := s.LoadSchedules(roomID)
if err != nil {
return err
}
for i := range schedules {
if schedules[i].ID == scheduleID {
// 验证 cron
if !updated.Once {
if _, err := ParseCron(updated.Cron); err != nil {
return fmt.Errorf("无效的 cron 表达式: %w", err)
}
}
updated.ID = scheduleID
updated.RoomID = roomID
updated.CreatedAt = schedules[i].CreatedAt
if updated.UserName == "" {
updated.UserName = schedules[i].UserName
}
schedules[i] = updated
return s.SaveSchedules(roomID, schedules)
}
}
return fmt.Errorf("schedule %s not found", scheduleID)
}
// DeleteSchedule 删除定时任务
func (s *Scheduler) DeleteSchedule(roomID, scheduleID string) error {
schedules, err := s.LoadSchedules(roomID)
if err != nil {
return err
}
for i := range schedules {
if schedules[i].ID == scheduleID {
schedules = append(schedules[:i], schedules[i+1:]...)
return s.SaveSchedules(roomID, schedules)
}
}
return fmt.Errorf("schedule %s not found", scheduleID)
}
// RunNow 手动立即触发一个定时任务
func (s *Scheduler) RunNow(ctx context.Context, roomID, scheduleID string) error {
schedules, err := s.LoadSchedules(roomID)
if err != nil {
return err
}
var sch *Schedule
for i := range schedules {
if schedules[i].ID == scheduleID {
sch = &schedules[i]
break
}
}
if sch == nil {
return fmt.Errorf("schedule %s not found", scheduleID)
}
s.mu.RLock()
room := s.rooms[roomID]
s.mu.RUnlock()
if room == nil {
return fmt.Errorf("room %s not found", roomID)
}
userName := sch.UserName
if userName == "" {
userName = "scheduler"
}
if s.Broadcast != nil {
s.Broadcast(roomID, "schedule_run", sch.Message, userName)
}
sch.LastRunAt = time.Now().Format(time.RFC3339)
s.SaveSchedules(roomID, schedules)
go func() {
if err := room.HandleUserMessage(ctx, userName, sch.Message); err != nil {
log.Printf("[scheduler] 手动执行失败: %v", err)
}
}()
return nil
}
// --- Cron 解析器 ---
// CronExpr 解析后的 cron 表达式
type CronExpr struct {
Minutes []int // 0-59, nil 表示任意
Hours []int // 0-23
Days []int // 1-31
Months []int // 1-12
Weekdays []int // 0-6 (0=Sunday)
}
// ParseCron 解析标准5字段 cron 表达式:分 时 日 月 周
func ParseCron(expr string) (CronExpr, error) {
fields := strings.Fields(expr)
if len(fields) != 5 {
return CronExpr{}, fmt.Errorf("cron 表达式需要5个字段得到 %d 个", len(fields))
}
minutes, err := parseField(fields[0], 0, 59)
if err != nil {
return CronExpr{}, fmt.Errorf("分钟字段错误: %w", err)
}
hours, err := parseField(fields[1], 0, 23)
if err != nil {
return CronExpr{}, fmt.Errorf("小时字段错误: %w", err)
}
days, err := parseField(fields[2], 1, 31)
if err != nil {
return CronExpr{}, fmt.Errorf("日期字段错误: %w", err)
}
months, err := parseField(fields[3], 1, 12)
if err != nil {
return CronExpr{}, fmt.Errorf("月份字段错误: %w", err)
}
weekdays, err := parseField(fields[4], 0, 6)
if err != nil {
return CronExpr{}, fmt.Errorf("星期字段错误: %w", err)
}
return CronExpr{
Minutes: minutes,
Hours: hours,
Days: days,
Months: months,
Weekdays: weekdays,
}, nil
}
// Match 检查时间是否匹配 cron 表达式
func (c CronExpr) Match(t time.Time) bool {
return contains(c.Minutes, t.Minute()) &&
contains(c.Hours, t.Hour()) &&
contains(c.Days, t.Day()) &&
contains(c.Months, int(t.Month())) &&
contains(c.Weekdays, int(t.Weekday()))
}
func contains(vals []int, v int) bool {
if vals == nil {
return true // nil 表示 *(任意)
}
for _, x := range vals {
if x == v {
return true
}
}
return false
}
// parseField 解析单个 cron 字段
// 支持: *、5、1,3,5、1-5、*/10、1-5/2
func parseField(field string, min, max int) ([]int, error) {
if field == "*" {
return nil, nil // nil 表示任意
}
var result []int
parts := strings.Split(field, ",")
for _, part := range parts {
vals, err := parsePart(part, min, max)
if err != nil {
return nil, err
}
result = append(result, vals...)
}
if len(result) == 0 {
return nil, fmt.Errorf("空字段")
}
return result, nil
}
func parsePart(part string, min, max int) ([]int, error) {
// 处理步进 */N 或 M-N/S
step := 1
if idx := strings.Index(part, "/"); idx >= 0 {
s, err := strconv.Atoi(part[idx+1:])
if err != nil || s <= 0 {
return nil, fmt.Errorf("无效步进值: %s", part)
}
step = s
part = part[:idx]
}
var start, end int
if part == "*" {
start, end = min, max
} else if idx := strings.Index(part, "-"); idx >= 0 {
var err error
start, err = strconv.Atoi(part[:idx])
if err != nil {
return nil, fmt.Errorf("无效范围: %s", part)
}
end, err = strconv.Atoi(part[idx+1:])
if err != nil {
return nil, fmt.Errorf("无效范围: %s", part)
}
} else {
v, err := strconv.Atoi(part)
if err != nil {
return nil, fmt.Errorf("无效值: %s", part)
}
if v < min || v > max {
return nil, fmt.Errorf("值 %d 超出范围 [%d, %d]", v, min, max)
}
if step == 1 {
return []int{v}, nil
}
start, end = v, max
}
if start < min || end > max || start > end {
return nil, fmt.Errorf("范围 %d-%d 无效,允许 [%d, %d]", start, end, min, max)
}
var vals []int
for i := start; i <= end; i += step {
vals = append(vals, i)
}
return vals, nil
}