- 数据层:messages 表增加 part_type 字段,新建 file_versions 表支持版本追踪 - 后端:saveWorkspace 版本追踪、saveAgentOutput 源头分离、generateBriefMessage 成员简报 - 后端:applyDocumentEdit 增量编辑、buildWorkflowStep phase-aware 工作流引擎 - API:文件版本查询/回退接口 - 前端:part_type 驱动渲染,产物面板版本历史 - 新增写手团队(主编/搜索员/策划编辑/合规审查员)配置 - store 模块、scheduler 模块、web-search skill Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
569 lines
13 KiB
Go
569 lines
13 KiB
Go
package scheduler
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"log"
|
||
"os"
|
||
"path/filepath"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// Schedule 定时任务定义
|
||
type Schedule struct {
|
||
ID string `json:"id"`
|
||
RoomID string `json:"room_id"`
|
||
Cron string `json:"cron"` // cron 表达式,如 "0 8 * * *";单次任务用 "once YYYY-MM-DD HH:MM"
|
||
Message string `json:"message"` // 注入的消息内容
|
||
UserName string `json:"user_name"` // 虚拟用户名,默认 "scheduler"
|
||
Enabled bool `json:"enabled"`
|
||
Once bool `json:"once"` // 是否为单次任务
|
||
CreatedAt string `json:"created_at"`
|
||
LastRunAt string `json:"last_run_at"`
|
||
}
|
||
|
||
// RoomHandler 房间接口,供 scheduler 调用
|
||
type RoomHandler interface {
|
||
HandleUserMessage(ctx context.Context, userName, msg string) error
|
||
}
|
||
|
||
// BroadcastFunc 事件广播回调
|
||
type BroadcastFunc func(roomID string, eventType, message, userName string)
|
||
|
||
// ScheduleStore 定时任务存储接口
|
||
type ScheduleStore interface {
|
||
GetAllEnabled() ([]StoreSchedule, error)
|
||
UpdateLastRun(id, lastRunAt string) error
|
||
DisableSchedule(id string) error
|
||
}
|
||
|
||
// StoreSchedule 从 store 包导入的 Schedule 结构
|
||
type StoreSchedule struct {
|
||
ID string
|
||
RoomID string
|
||
Cron string
|
||
Message string
|
||
UserName string
|
||
Enabled bool
|
||
Once bool
|
||
CreatedAt string
|
||
LastRunAt string
|
||
}
|
||
|
||
// Scheduler 定时任务调度器
|
||
type Scheduler struct {
|
||
rooms map[string]RoomHandler
|
||
roomsDir string
|
||
mu sync.RWMutex
|
||
cancel context.CancelFunc
|
||
Broadcast BroadcastFunc
|
||
Store ScheduleStore
|
||
}
|
||
|
||
// New 创建调度器
|
||
func New(roomsDir string) *Scheduler {
|
||
return &Scheduler{
|
||
rooms: make(map[string]RoomHandler),
|
||
roomsDir: roomsDir,
|
||
}
|
||
}
|
||
|
||
// SetRooms 注入 room 引用
|
||
func (s *Scheduler) SetRooms(rooms map[string]RoomHandler) {
|
||
s.mu.Lock()
|
||
defer s.mu.Unlock()
|
||
s.rooms = rooms
|
||
}
|
||
|
||
// Start 启动调度器,每 30 秒检查一次
|
||
func (s *Scheduler) Start(ctx context.Context) {
|
||
ctx, s.cancel = context.WithCancel(ctx)
|
||
ticker := time.NewTicker(30 * time.Second)
|
||
defer ticker.Stop()
|
||
|
||
log.Println("[scheduler] 调度器已启动,每 30 秒检查一次")
|
||
|
||
// 启动时立即检查一次
|
||
s.tick(ctx)
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
log.Println("[scheduler] 调度器已停止")
|
||
return
|
||
case <-ticker.C:
|
||
s.tick(ctx)
|
||
}
|
||
}
|
||
}
|
||
|
||
// Stop 优雅关闭
|
||
func (s *Scheduler) Stop() {
|
||
if s.cancel != nil {
|
||
s.cancel()
|
||
}
|
||
}
|
||
|
||
// tick 检查所有房间的定时任务
|
||
func (s *Scheduler) tick(ctx context.Context) {
|
||
now := time.Now()
|
||
|
||
// 优先使用 Store
|
||
if s.Store != nil {
|
||
s.tickFromStore(ctx, now)
|
||
return
|
||
}
|
||
|
||
entries, err := os.ReadDir(s.roomsDir)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
for _, e := range entries {
|
||
if !e.IsDir() {
|
||
continue
|
||
}
|
||
roomID := e.Name()
|
||
schedules, err := s.LoadSchedules(roomID)
|
||
if err != nil || len(schedules) == 0 {
|
||
continue
|
||
}
|
||
|
||
changed := false
|
||
for i := range schedules {
|
||
sch := &schedules[i]
|
||
if !sch.Enabled {
|
||
continue
|
||
}
|
||
|
||
matched := false
|
||
if sch.Once {
|
||
matched = matchOnce(sch.Cron, now)
|
||
} else {
|
||
expr, err := ParseCron(sch.Cron)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
matched = expr.Match(now)
|
||
}
|
||
|
||
if !matched {
|
||
continue
|
||
}
|
||
|
||
if sameMinute(sch.LastRunAt, now) {
|
||
continue
|
||
}
|
||
|
||
s.mu.RLock()
|
||
room := s.rooms[roomID]
|
||
s.mu.RUnlock()
|
||
|
||
if room == nil {
|
||
continue
|
||
}
|
||
|
||
userName := sch.UserName
|
||
if userName == "" {
|
||
userName = "scheduler"
|
||
}
|
||
|
||
log.Printf("[scheduler] 触发定时任务: room=%s, schedule=%s, message=%s", roomID, sch.ID, sch.Message)
|
||
|
||
if s.Broadcast != nil {
|
||
s.Broadcast(roomID, "schedule_run", sch.Message, userName)
|
||
}
|
||
|
||
go func(r RoomHandler, un, msg string) {
|
||
if err := r.HandleUserMessage(ctx, un, msg); err != nil {
|
||
log.Printf("[scheduler] 执行失败: %v", err)
|
||
}
|
||
}(room, userName, sch.Message)
|
||
|
||
sch.LastRunAt = now.Format(time.RFC3339)
|
||
changed = true
|
||
|
||
if sch.Once {
|
||
sch.Enabled = false
|
||
log.Printf("[scheduler] 单次任务已完成并禁用: %s", sch.ID)
|
||
}
|
||
}
|
||
|
||
if changed {
|
||
s.SaveSchedules(roomID, schedules)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *Scheduler) tickFromStore(ctx context.Context, now time.Time) {
|
||
all, err := s.Store.GetAllEnabled()
|
||
if err != nil || len(all) == 0 {
|
||
return
|
||
}
|
||
|
||
for _, sch := range all {
|
||
matched := false
|
||
if sch.Once {
|
||
matched = matchOnce(sch.Cron, now)
|
||
} else {
|
||
expr, err := ParseCron(sch.Cron)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
matched = expr.Match(now)
|
||
}
|
||
|
||
if !matched {
|
||
continue
|
||
}
|
||
|
||
if sameMinute(sch.LastRunAt, now) {
|
||
continue
|
||
}
|
||
|
||
s.mu.RLock()
|
||
room := s.rooms[sch.RoomID]
|
||
s.mu.RUnlock()
|
||
|
||
if room == nil {
|
||
continue
|
||
}
|
||
|
||
userName := sch.UserName
|
||
if userName == "" {
|
||
userName = "scheduler"
|
||
}
|
||
|
||
log.Printf("[scheduler] 触发定时任务: room=%s, schedule=%s, message=%s", sch.RoomID, sch.ID, sch.Message)
|
||
|
||
if s.Broadcast != nil {
|
||
s.Broadcast(sch.RoomID, "schedule_run", sch.Message, userName)
|
||
}
|
||
|
||
go func(r RoomHandler, un, msg string) {
|
||
if err := r.HandleUserMessage(ctx, un, msg); err != nil {
|
||
log.Printf("[scheduler] 执行失败: %v", err)
|
||
}
|
||
}(room, userName, sch.Message)
|
||
|
||
s.Store.UpdateLastRun(sch.ID, now.Format(time.RFC3339))
|
||
|
||
if sch.Once {
|
||
s.Store.DisableSchedule(sch.ID)
|
||
log.Printf("[scheduler] 单次任务已完成并禁用: %s", sch.ID)
|
||
}
|
||
}
|
||
}
|
||
|
||
func sameMinute(lastRunAt string, now time.Time) bool {
|
||
if lastRunAt == "" {
|
||
return false
|
||
}
|
||
lastRun, err := time.Parse(time.RFC3339, lastRunAt)
|
||
if err != nil {
|
||
return false
|
||
}
|
||
return lastRun.Year() == now.Year() && lastRun.Month() == now.Month() &&
|
||
lastRun.Day() == now.Day() && lastRun.Hour() == now.Hour() &&
|
||
lastRun.Minute() == now.Minute()
|
||
}
|
||
|
||
// matchOnce 匹配单次任务时间,格式 "2006-01-02 15:04"
|
||
func matchOnce(cronExpr string, now time.Time) bool {
|
||
t, err := time.ParseInLocation("2006-01-02 15:04", cronExpr, now.Location())
|
||
if err != nil {
|
||
return false
|
||
}
|
||
return t.Year() == now.Year() && t.Month() == now.Month() &&
|
||
t.Day() == now.Day() && t.Hour() == now.Hour() &&
|
||
t.Minute() == now.Minute()
|
||
}
|
||
|
||
// --- 存储 ---
|
||
|
||
func (s *Scheduler) schedulesFile(roomID string) string {
|
||
return filepath.Join(s.roomsDir, roomID, "schedules.json")
|
||
}
|
||
|
||
// LoadSchedules 从文件加载定时任务
|
||
func (s *Scheduler) LoadSchedules(roomID string) ([]Schedule, error) {
|
||
data, err := os.ReadFile(s.schedulesFile(roomID))
|
||
if err != nil {
|
||
if os.IsNotExist(err) {
|
||
return nil, nil
|
||
}
|
||
return nil, err
|
||
}
|
||
var schedules []Schedule
|
||
if err := json.Unmarshal(data, &schedules); err != nil {
|
||
return nil, err
|
||
}
|
||
return schedules, nil
|
||
}
|
||
|
||
// SaveSchedules 保存定时任务到文件
|
||
func (s *Scheduler) SaveSchedules(roomID string, schedules []Schedule) error {
|
||
data, err := json.MarshalIndent(schedules, "", " ")
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return os.WriteFile(s.schedulesFile(roomID), data, 0644)
|
||
}
|
||
|
||
// AddSchedule 添加定时任务
|
||
func (s *Scheduler) AddSchedule(roomID string, sch Schedule) error {
|
||
schedules, _ := s.LoadSchedules(roomID)
|
||
sch.CreatedAt = time.Now().Format(time.RFC3339)
|
||
if sch.UserName == "" {
|
||
sch.UserName = "scheduler"
|
||
}
|
||
|
||
// 验证 cron 表达式
|
||
if !sch.Once {
|
||
if _, err := ParseCron(sch.Cron); err != nil {
|
||
return fmt.Errorf("无效的 cron 表达式: %w", err)
|
||
}
|
||
} else {
|
||
if _, err := time.ParseInLocation("2006-01-02 15:04", sch.Cron, time.Now().Location()); err != nil {
|
||
return fmt.Errorf("无效的单次时间格式,应为 YYYY-MM-DD HH:MM: %w", err)
|
||
}
|
||
}
|
||
|
||
schedules = append(schedules, sch)
|
||
return s.SaveSchedules(roomID, schedules)
|
||
}
|
||
|
||
// UpdateSchedule 更新定时任务
|
||
func (s *Scheduler) UpdateSchedule(roomID, scheduleID string, updated Schedule) error {
|
||
schedules, err := s.LoadSchedules(roomID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
for i := range schedules {
|
||
if schedules[i].ID == scheduleID {
|
||
// 验证 cron
|
||
if !updated.Once {
|
||
if _, err := ParseCron(updated.Cron); err != nil {
|
||
return fmt.Errorf("无效的 cron 表达式: %w", err)
|
||
}
|
||
}
|
||
updated.ID = scheduleID
|
||
updated.RoomID = roomID
|
||
updated.CreatedAt = schedules[i].CreatedAt
|
||
if updated.UserName == "" {
|
||
updated.UserName = schedules[i].UserName
|
||
}
|
||
schedules[i] = updated
|
||
return s.SaveSchedules(roomID, schedules)
|
||
}
|
||
}
|
||
return fmt.Errorf("schedule %s not found", scheduleID)
|
||
}
|
||
|
||
// DeleteSchedule 删除定时任务
|
||
func (s *Scheduler) DeleteSchedule(roomID, scheduleID string) error {
|
||
schedules, err := s.LoadSchedules(roomID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
for i := range schedules {
|
||
if schedules[i].ID == scheduleID {
|
||
schedules = append(schedules[:i], schedules[i+1:]...)
|
||
return s.SaveSchedules(roomID, schedules)
|
||
}
|
||
}
|
||
return fmt.Errorf("schedule %s not found", scheduleID)
|
||
}
|
||
|
||
// RunNow 手动立即触发一个定时任务
|
||
func (s *Scheduler) RunNow(ctx context.Context, roomID, scheduleID string) error {
|
||
schedules, err := s.LoadSchedules(roomID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
var sch *Schedule
|
||
for i := range schedules {
|
||
if schedules[i].ID == scheduleID {
|
||
sch = &schedules[i]
|
||
break
|
||
}
|
||
}
|
||
if sch == nil {
|
||
return fmt.Errorf("schedule %s not found", scheduleID)
|
||
}
|
||
|
||
s.mu.RLock()
|
||
room := s.rooms[roomID]
|
||
s.mu.RUnlock()
|
||
if room == nil {
|
||
return fmt.Errorf("room %s not found", roomID)
|
||
}
|
||
|
||
userName := sch.UserName
|
||
if userName == "" {
|
||
userName = "scheduler"
|
||
}
|
||
|
||
if s.Broadcast != nil {
|
||
s.Broadcast(roomID, "schedule_run", sch.Message, userName)
|
||
}
|
||
|
||
sch.LastRunAt = time.Now().Format(time.RFC3339)
|
||
s.SaveSchedules(roomID, schedules)
|
||
|
||
go func() {
|
||
if err := room.HandleUserMessage(ctx, userName, sch.Message); err != nil {
|
||
log.Printf("[scheduler] 手动执行失败: %v", err)
|
||
}
|
||
}()
|
||
|
||
return nil
|
||
}
|
||
|
||
// --- Cron 解析器 ---
|
||
|
||
// CronExpr 解析后的 cron 表达式
|
||
type CronExpr struct {
|
||
Minutes []int // 0-59, nil 表示任意
|
||
Hours []int // 0-23
|
||
Days []int // 1-31
|
||
Months []int // 1-12
|
||
Weekdays []int // 0-6 (0=Sunday)
|
||
}
|
||
|
||
// ParseCron 解析标准5字段 cron 表达式:分 时 日 月 周
|
||
func ParseCron(expr string) (CronExpr, error) {
|
||
fields := strings.Fields(expr)
|
||
if len(fields) != 5 {
|
||
return CronExpr{}, fmt.Errorf("cron 表达式需要5个字段,得到 %d 个", len(fields))
|
||
}
|
||
|
||
minutes, err := parseField(fields[0], 0, 59)
|
||
if err != nil {
|
||
return CronExpr{}, fmt.Errorf("分钟字段错误: %w", err)
|
||
}
|
||
hours, err := parseField(fields[1], 0, 23)
|
||
if err != nil {
|
||
return CronExpr{}, fmt.Errorf("小时字段错误: %w", err)
|
||
}
|
||
days, err := parseField(fields[2], 1, 31)
|
||
if err != nil {
|
||
return CronExpr{}, fmt.Errorf("日期字段错误: %w", err)
|
||
}
|
||
months, err := parseField(fields[3], 1, 12)
|
||
if err != nil {
|
||
return CronExpr{}, fmt.Errorf("月份字段错误: %w", err)
|
||
}
|
||
weekdays, err := parseField(fields[4], 0, 6)
|
||
if err != nil {
|
||
return CronExpr{}, fmt.Errorf("星期字段错误: %w", err)
|
||
}
|
||
|
||
return CronExpr{
|
||
Minutes: minutes,
|
||
Hours: hours,
|
||
Days: days,
|
||
Months: months,
|
||
Weekdays: weekdays,
|
||
}, nil
|
||
}
|
||
|
||
// Match 检查时间是否匹配 cron 表达式
|
||
func (c CronExpr) Match(t time.Time) bool {
|
||
return contains(c.Minutes, t.Minute()) &&
|
||
contains(c.Hours, t.Hour()) &&
|
||
contains(c.Days, t.Day()) &&
|
||
contains(c.Months, int(t.Month())) &&
|
||
contains(c.Weekdays, int(t.Weekday()))
|
||
}
|
||
|
||
func contains(vals []int, v int) bool {
|
||
if vals == nil {
|
||
return true // nil 表示 *(任意)
|
||
}
|
||
for _, x := range vals {
|
||
if x == v {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
// parseField 解析单个 cron 字段
|
||
// 支持: *、5、1,3,5、1-5、*/10、1-5/2
|
||
func parseField(field string, min, max int) ([]int, error) {
|
||
if field == "*" {
|
||
return nil, nil // nil 表示任意
|
||
}
|
||
|
||
var result []int
|
||
parts := strings.Split(field, ",")
|
||
for _, part := range parts {
|
||
vals, err := parsePart(part, min, max)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
result = append(result, vals...)
|
||
}
|
||
|
||
if len(result) == 0 {
|
||
return nil, fmt.Errorf("空字段")
|
||
}
|
||
return result, nil
|
||
}
|
||
|
||
func parsePart(part string, min, max int) ([]int, error) {
|
||
// 处理步进 */N 或 M-N/S
|
||
step := 1
|
||
if idx := strings.Index(part, "/"); idx >= 0 {
|
||
s, err := strconv.Atoi(part[idx+1:])
|
||
if err != nil || s <= 0 {
|
||
return nil, fmt.Errorf("无效步进值: %s", part)
|
||
}
|
||
step = s
|
||
part = part[:idx]
|
||
}
|
||
|
||
var start, end int
|
||
if part == "*" {
|
||
start, end = min, max
|
||
} else if idx := strings.Index(part, "-"); idx >= 0 {
|
||
var err error
|
||
start, err = strconv.Atoi(part[:idx])
|
||
if err != nil {
|
||
return nil, fmt.Errorf("无效范围: %s", part)
|
||
}
|
||
end, err = strconv.Atoi(part[idx+1:])
|
||
if err != nil {
|
||
return nil, fmt.Errorf("无效范围: %s", part)
|
||
}
|
||
} else {
|
||
v, err := strconv.Atoi(part)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("无效值: %s", part)
|
||
}
|
||
if v < min || v > max {
|
||
return nil, fmt.Errorf("值 %d 超出范围 [%d, %d]", v, min, max)
|
||
}
|
||
if step == 1 {
|
||
return []int{v}, nil
|
||
}
|
||
start, end = v, max
|
||
}
|
||
|
||
if start < min || end > max || start > end {
|
||
return nil, fmt.Errorf("范围 %d-%d 无效,允许 [%d, %d]", start, end, min, max)
|
||
}
|
||
|
||
var vals []int
|
||
for i := start; i <= end; i += step {
|
||
vals = append(vals, i)
|
||
}
|
||
return vals, nil
|
||
}
|