package scheduler import ( "context" "encoding/json" "fmt" "log" "os" "path/filepath" "strconv" "strings" "sync" "time" ) // Schedule 定时任务定义 type Schedule struct { ID string `json:"id"` RoomID string `json:"room_id"` Cron string `json:"cron"` // cron 表达式,如 "0 8 * * *";单次任务用 "once YYYY-MM-DD HH:MM" Message string `json:"message"` // 注入的消息内容 UserName string `json:"user_name"` // 虚拟用户名,默认 "scheduler" Enabled bool `json:"enabled"` Once bool `json:"once"` // 是否为单次任务 CreatedAt string `json:"created_at"` LastRunAt string `json:"last_run_at"` } // RoomHandler 房间接口,供 scheduler 调用 type RoomHandler interface { HandleUserMessage(ctx context.Context, userName, msg string) error } // BroadcastFunc 事件广播回调 type BroadcastFunc func(roomID string, eventType, message, userName string) // ScheduleStore 定时任务存储接口 type ScheduleStore interface { GetAllEnabled() ([]StoreSchedule, error) UpdateLastRun(id, lastRunAt string) error DisableSchedule(id string) error } // StoreSchedule 从 store 包导入的 Schedule 结构 type StoreSchedule struct { ID string RoomID string Cron string Message string UserName string Enabled bool Once bool CreatedAt string LastRunAt string } // Scheduler 定时任务调度器 type Scheduler struct { rooms map[string]RoomHandler roomsDir string mu sync.RWMutex cancel context.CancelFunc Broadcast BroadcastFunc Store ScheduleStore } // New 创建调度器 func New(roomsDir string) *Scheduler { return &Scheduler{ rooms: make(map[string]RoomHandler), roomsDir: roomsDir, } } // SetRooms 注入 room 引用 func (s *Scheduler) SetRooms(rooms map[string]RoomHandler) { s.mu.Lock() defer s.mu.Unlock() s.rooms = rooms } // Start 启动调度器,每 30 秒检查一次 func (s *Scheduler) Start(ctx context.Context) { ctx, s.cancel = context.WithCancel(ctx) ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() log.Println("[scheduler] 调度器已启动,每 30 秒检查一次") // 启动时立即检查一次 s.tick(ctx) for { select { case <-ctx.Done(): log.Println("[scheduler] 调度器已停止") return case <-ticker.C: s.tick(ctx) } } } // Stop 优雅关闭 func (s *Scheduler) Stop() { if s.cancel != nil { s.cancel() } } // tick 检查所有房间的定时任务 func (s *Scheduler) tick(ctx context.Context) { now := time.Now() // 优先使用 Store if s.Store != nil { s.tickFromStore(ctx, now) return } entries, err := os.ReadDir(s.roomsDir) if err != nil { return } for _, e := range entries { if !e.IsDir() { continue } roomID := e.Name() schedules, err := s.LoadSchedules(roomID) if err != nil || len(schedules) == 0 { continue } changed := false for i := range schedules { sch := &schedules[i] if !sch.Enabled { continue } matched := false if sch.Once { matched = matchOnce(sch.Cron, now) } else { expr, err := ParseCron(sch.Cron) if err != nil { continue } matched = expr.Match(now) } if !matched { continue } if sameMinute(sch.LastRunAt, now) { continue } s.mu.RLock() room := s.rooms[roomID] s.mu.RUnlock() if room == nil { continue } userName := sch.UserName if userName == "" { userName = "scheduler" } log.Printf("[scheduler] 触发定时任务: room=%s, schedule=%s, message=%s", roomID, sch.ID, sch.Message) if s.Broadcast != nil { s.Broadcast(roomID, "schedule_run", sch.Message, userName) } go func(r RoomHandler, un, msg string) { if err := r.HandleUserMessage(ctx, un, msg); err != nil { log.Printf("[scheduler] 执行失败: %v", err) } }(room, userName, sch.Message) sch.LastRunAt = now.Format(time.RFC3339) changed = true if sch.Once { sch.Enabled = false log.Printf("[scheduler] 单次任务已完成并禁用: %s", sch.ID) } } if changed { s.SaveSchedules(roomID, schedules) } } } func (s *Scheduler) tickFromStore(ctx context.Context, now time.Time) { all, err := s.Store.GetAllEnabled() if err != nil || len(all) == 0 { return } for _, sch := range all { matched := false if sch.Once { matched = matchOnce(sch.Cron, now) } else { expr, err := ParseCron(sch.Cron) if err != nil { continue } matched = expr.Match(now) } if !matched { continue } if sameMinute(sch.LastRunAt, now) { continue } s.mu.RLock() room := s.rooms[sch.RoomID] s.mu.RUnlock() if room == nil { continue } userName := sch.UserName if userName == "" { userName = "scheduler" } log.Printf("[scheduler] 触发定时任务: room=%s, schedule=%s, message=%s", sch.RoomID, sch.ID, sch.Message) if s.Broadcast != nil { s.Broadcast(sch.RoomID, "schedule_run", sch.Message, userName) } go func(r RoomHandler, un, msg string) { if err := r.HandleUserMessage(ctx, un, msg); err != nil { log.Printf("[scheduler] 执行失败: %v", err) } }(room, userName, sch.Message) s.Store.UpdateLastRun(sch.ID, now.Format(time.RFC3339)) if sch.Once { s.Store.DisableSchedule(sch.ID) log.Printf("[scheduler] 单次任务已完成并禁用: %s", sch.ID) } } } func sameMinute(lastRunAt string, now time.Time) bool { if lastRunAt == "" { return false } lastRun, err := time.Parse(time.RFC3339, lastRunAt) if err != nil { return false } return lastRun.Year() == now.Year() && lastRun.Month() == now.Month() && lastRun.Day() == now.Day() && lastRun.Hour() == now.Hour() && lastRun.Minute() == now.Minute() } // matchOnce 匹配单次任务时间,格式 "2006-01-02 15:04" func matchOnce(cronExpr string, now time.Time) bool { t, err := time.ParseInLocation("2006-01-02 15:04", cronExpr, now.Location()) if err != nil { return false } return t.Year() == now.Year() && t.Month() == now.Month() && t.Day() == now.Day() && t.Hour() == now.Hour() && t.Minute() == now.Minute() } // --- 存储 --- func (s *Scheduler) schedulesFile(roomID string) string { return filepath.Join(s.roomsDir, roomID, "schedules.json") } // LoadSchedules 从文件加载定时任务 func (s *Scheduler) LoadSchedules(roomID string) ([]Schedule, error) { data, err := os.ReadFile(s.schedulesFile(roomID)) if err != nil { if os.IsNotExist(err) { return nil, nil } return nil, err } var schedules []Schedule if err := json.Unmarshal(data, &schedules); err != nil { return nil, err } return schedules, nil } // SaveSchedules 保存定时任务到文件 func (s *Scheduler) SaveSchedules(roomID string, schedules []Schedule) error { data, err := json.MarshalIndent(schedules, "", " ") if err != nil { return err } return os.WriteFile(s.schedulesFile(roomID), data, 0644) } // AddSchedule 添加定时任务 func (s *Scheduler) AddSchedule(roomID string, sch Schedule) error { schedules, _ := s.LoadSchedules(roomID) sch.CreatedAt = time.Now().Format(time.RFC3339) if sch.UserName == "" { sch.UserName = "scheduler" } // 验证 cron 表达式 if !sch.Once { if _, err := ParseCron(sch.Cron); err != nil { return fmt.Errorf("无效的 cron 表达式: %w", err) } } else { if _, err := time.ParseInLocation("2006-01-02 15:04", sch.Cron, time.Now().Location()); err != nil { return fmt.Errorf("无效的单次时间格式,应为 YYYY-MM-DD HH:MM: %w", err) } } schedules = append(schedules, sch) return s.SaveSchedules(roomID, schedules) } // UpdateSchedule 更新定时任务 func (s *Scheduler) UpdateSchedule(roomID, scheduleID string, updated Schedule) error { schedules, err := s.LoadSchedules(roomID) if err != nil { return err } for i := range schedules { if schedules[i].ID == scheduleID { // 验证 cron if !updated.Once { if _, err := ParseCron(updated.Cron); err != nil { return fmt.Errorf("无效的 cron 表达式: %w", err) } } updated.ID = scheduleID updated.RoomID = roomID updated.CreatedAt = schedules[i].CreatedAt if updated.UserName == "" { updated.UserName = schedules[i].UserName } schedules[i] = updated return s.SaveSchedules(roomID, schedules) } } return fmt.Errorf("schedule %s not found", scheduleID) } // DeleteSchedule 删除定时任务 func (s *Scheduler) DeleteSchedule(roomID, scheduleID string) error { schedules, err := s.LoadSchedules(roomID) if err != nil { return err } for i := range schedules { if schedules[i].ID == scheduleID { schedules = append(schedules[:i], schedules[i+1:]...) return s.SaveSchedules(roomID, schedules) } } return fmt.Errorf("schedule %s not found", scheduleID) } // RunNow 手动立即触发一个定时任务 func (s *Scheduler) RunNow(ctx context.Context, roomID, scheduleID string) error { schedules, err := s.LoadSchedules(roomID) if err != nil { return err } var sch *Schedule for i := range schedules { if schedules[i].ID == scheduleID { sch = &schedules[i] break } } if sch == nil { return fmt.Errorf("schedule %s not found", scheduleID) } s.mu.RLock() room := s.rooms[roomID] s.mu.RUnlock() if room == nil { return fmt.Errorf("room %s not found", roomID) } userName := sch.UserName if userName == "" { userName = "scheduler" } if s.Broadcast != nil { s.Broadcast(roomID, "schedule_run", sch.Message, userName) } sch.LastRunAt = time.Now().Format(time.RFC3339) s.SaveSchedules(roomID, schedules) go func() { if err := room.HandleUserMessage(ctx, userName, sch.Message); err != nil { log.Printf("[scheduler] 手动执行失败: %v", err) } }() return nil } // --- Cron 解析器 --- // CronExpr 解析后的 cron 表达式 type CronExpr struct { Minutes []int // 0-59, nil 表示任意 Hours []int // 0-23 Days []int // 1-31 Months []int // 1-12 Weekdays []int // 0-6 (0=Sunday) } // ParseCron 解析标准5字段 cron 表达式:分 时 日 月 周 func ParseCron(expr string) (CronExpr, error) { fields := strings.Fields(expr) if len(fields) != 5 { return CronExpr{}, fmt.Errorf("cron 表达式需要5个字段,得到 %d 个", len(fields)) } minutes, err := parseField(fields[0], 0, 59) if err != nil { return CronExpr{}, fmt.Errorf("分钟字段错误: %w", err) } hours, err := parseField(fields[1], 0, 23) if err != nil { return CronExpr{}, fmt.Errorf("小时字段错误: %w", err) } days, err := parseField(fields[2], 1, 31) if err != nil { return CronExpr{}, fmt.Errorf("日期字段错误: %w", err) } months, err := parseField(fields[3], 1, 12) if err != nil { return CronExpr{}, fmt.Errorf("月份字段错误: %w", err) } weekdays, err := parseField(fields[4], 0, 6) if err != nil { return CronExpr{}, fmt.Errorf("星期字段错误: %w", err) } return CronExpr{ Minutes: minutes, Hours: hours, Days: days, Months: months, Weekdays: weekdays, }, nil } // Match 检查时间是否匹配 cron 表达式 func (c CronExpr) Match(t time.Time) bool { return contains(c.Minutes, t.Minute()) && contains(c.Hours, t.Hour()) && contains(c.Days, t.Day()) && contains(c.Months, int(t.Month())) && contains(c.Weekdays, int(t.Weekday())) } func contains(vals []int, v int) bool { if vals == nil { return true // nil 表示 *(任意) } for _, x := range vals { if x == v { return true } } return false } // parseField 解析单个 cron 字段 // 支持: *、5、1,3,5、1-5、*/10、1-5/2 func parseField(field string, min, max int) ([]int, error) { if field == "*" { return nil, nil // nil 表示任意 } var result []int parts := strings.Split(field, ",") for _, part := range parts { vals, err := parsePart(part, min, max) if err != nil { return nil, err } result = append(result, vals...) } if len(result) == 0 { return nil, fmt.Errorf("空字段") } return result, nil } func parsePart(part string, min, max int) ([]int, error) { // 处理步进 */N 或 M-N/S step := 1 if idx := strings.Index(part, "/"); idx >= 0 { s, err := strconv.Atoi(part[idx+1:]) if err != nil || s <= 0 { return nil, fmt.Errorf("无效步进值: %s", part) } step = s part = part[:idx] } var start, end int if part == "*" { start, end = min, max } else if idx := strings.Index(part, "-"); idx >= 0 { var err error start, err = strconv.Atoi(part[:idx]) if err != nil { return nil, fmt.Errorf("无效范围: %s", part) } end, err = strconv.Atoi(part[idx+1:]) if err != nil { return nil, fmt.Errorf("无效范围: %s", part) } } else { v, err := strconv.Atoi(part) if err != nil { return nil, fmt.Errorf("无效值: %s", part) } if v < min || v > max { return nil, fmt.Errorf("值 %d 超出范围 [%d, %d]", v, min, max) } if step == 1 { return []int{v}, nil } start, end = v, max } if start < min || end > max || start > end { return nil, fmt.Errorf("范围 %d-%d 无效,允许 [%d, %d]", start, end, min, max) } var vals []int for i := start; i <= end; i += step { vals = append(vals, i) } return vals, nil }