package main import ( "context" "encoding/json" "errors" "fmt" "math/rand" "strconv" "sync" "time" ) func main() { taskManager, _ := NewTaskManager() go taskManager.Run() count := 1 for { r := rand.Intn(10) if r < 6 { taskManager.CreateTask(TaskMetadata{ ID: strconv.Itoa(count), CreatedAt: time.Now(), UID: count, ProjectID: count, Status: true, }) count += 1 } else if r < 8 { st := rand.Intn(count) + 1 println("prepared to stop task", st) taskManager.StopTask(strconv.Itoa(st)) } else if r == 9 { allTasks := taskManager.GetTaskDetails() bs, _ := json.MarshalIndent(allTasks, "", " ") println(string(bs)) } else { taskManager.Stop() break } time.Sleep(5 * time.Second) } time.Sleep(10 * time.Minute) } type TaskMetadata struct { ID string `json:"task_id"` CreatedAt time.Time `json:"task_create_time"` UID int `json:"uid"` ProjectID int `json:"project_id"` Status bool `json:"is_run"` StopCause string `json:"stop_cause"` } type CommandType int const ( CmdCreate CommandType = iota // 创建新任务 CmdStop // 停止任务 CmdQuery // 查询任务状态 ) type TaskCommand struct { Type CommandType Data any } // TaskManager 是后台任务的核心管理器 type TaskManager struct { ctx context.Context // commands 是一个只接收命令的管道(channel),用于接收外部的创建、停止和查询命令。 commands chan TaskCommand // tasks 存储所有正在运行的任务。 tasks map[string]struct { context.Context context.CancelCauseFunc } rw sync.RWMutex // tasksMetadata 存储所有任务的元数据,用于提供任务详情查询。 tasksMetadata map[string]TaskMetadata rw1 sync.RWMutex doneCh chan struct{} close sync.Once } // NewTaskManager 创建并返回一个后台任务管理器实例及其命令管道。 // 外部通过 commands 通道向管理器发送命令。 func NewTaskManager() (*TaskManager, chan<- TaskCommand) { // 实例化 TaskManager,初始化 map 和 channel tm := &TaskManager{ ctx: context.Background(), commands: make(chan TaskCommand, 100), // 带有缓冲区的命令管道,避免阻塞 tasks: make(map[string]struct { context.Context context.CancelCauseFunc }), tasksMetadata: make(map[string]TaskMetadata), doneCh: make(chan struct{}, 1), } // 返回管理器实例以及一个只写的命令管道 return tm, tm.commands } func (tm *TaskManager) Run() { // 这个方法将在一个独立的 goroutine 中运行 // 负责从 tm.commands 通道读取命令并处理 for { select { case <-tm.doneCh: return case cmd, ok := <-tm.commands: if !ok { println("channel was closed") return } // 根据命令类型调用私有方法处理 switch cmd.Type { case CmdCreate: if createTask, ok := cmd.Data.(TaskMetadata); ok { go func(data any) { tm.rw.Lock() ctx, cancel := context.WithCancelCause(tm.ctx) tm.tasks[createTask.ID] = struct { context.Context context.CancelCauseFunc }{ctx, cancel} tm.rw.Unlock() tm.rw1.Lock() tm.tasksMetadata[createTask.ID] = createTask tm.rw1.Unlock() ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: fmt.Printf("Task is running current user_id is %d\n", createTask.UID) case <-ctx.Done(): fmt.Printf("Task is stopped.\n") return } } }(cmd.Data) } case CmdStop: if taskID, ok := cmd.Data.(string); ok { tm.rw.Lock() if task, ok := tm.tasks[taskID]; ok { task.CancelCauseFunc(errors.New("force to cancel the task")) tm.rw1.Lock() meta := tm.tasksMetadata[taskID] meta.Status = false meta.StopCause = context.Cause(task.Context).Error() tm.tasksMetadata[taskID] = meta tm.rw1.Unlock() delete(tm.tasks, taskID) } tm.rw.Unlock() } case CmdQuery: tm.GetTaskDetails() } } } } // CreateTask 封装了创建任务的命令,并发送给管理器 func (tm *TaskManager) CreateTask(metadata TaskMetadata) { tm.commands <- TaskCommand{ Type: CmdCreate, Data: metadata, } } // StopTask 封装了停止任务的命令,通过任务ID来指定 func (tm *TaskManager) StopTask(taskID string) { tm.commands <- TaskCommand{ Type: CmdStop, Data: taskID, } } // GetTaskDetails 返回当前所有正在运行任务的元数据列表 func (tm *TaskManager) GetTaskDetails() []TaskMetadata { tm.rw1.RLock() defer tm.rw1.RUnlock() var details []TaskMetadata for _, metadata := range tm.tasksMetadata { details = append(details, metadata) } return details } // Stop 关闭任务管理器 // 先拒绝新任务,然后关闭所有当前任务,关闭一个任务就删除一条元数据 // 调用此方法默认关闭发送管道 func (tm *TaskManager) Stop() { tm.close.Do(func() { // 1. 发送信号给 Run 方法,使其退出 tm.doneCh <- struct{}{} // 2. 遍历并关闭所有正在运行的任务 tm.rw.RLock() for id, task := range tm.tasks { fmt.Printf("Shutting down task %s...\n", id) task.CancelCauseFunc(errors.New("task manager is shutting down")) tm.rw1.Lock() meta := tm.tasksMetadata[id] meta.Status = false meta.StopCause = context.Cause(task.Context).Error() tm.tasksMetadata[id] = meta tm.rw1.Unlock() } tm.rw.RUnlock() // 4. 清空任务 tm.rw.Lock() for k := range tm.tasks { delete(tm.tasks, k) } tm.rw.Unlock() fmt.Println("All tasks have been stopped and data has been cleared.") // 5. 最后关闭 doneCh 和 commands 管道,以释放资源 close(tm.doneCh) close(tm.commands) }) }