Files
playground/task.go.bak
XuanLee-HEALER df39693dff commit
2025-11-10 15:30:21 +08:00

237 lines
5.7 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"time"
)
func main() {
taskManager, _ := NewTaskManager()
go taskManager.Run()
count := 1
for {
r := rand.Intn(10)
if r < 6 {
taskManager.CreateTask(TaskMetadata{
ID: strconv.Itoa(count),
CreatedAt: time.Now(),
UID: count,
ProjectID: count,
Status: true,
})
count += 1
} else if r < 8 {
st := rand.Intn(count) + 1
println("prepared to stop task", st)
taskManager.StopTask(strconv.Itoa(st))
} else if r == 9 {
allTasks := taskManager.GetTaskDetails()
bs, _ := json.MarshalIndent(allTasks, "", " ")
println(string(bs))
} else {
taskManager.Stop()
break
}
time.Sleep(5 * time.Second)
}
time.Sleep(10 * time.Minute)
}
type TaskMetadata struct {
ID string `json:"task_id"`
CreatedAt time.Time `json:"task_create_time"`
UID int `json:"uid"`
ProjectID int `json:"project_id"`
Status bool `json:"is_run"`
StopCause string `json:"stop_cause"`
}
type CommandType int
const (
CmdCreate CommandType = iota // 创建新任务
CmdStop // 停止任务
CmdQuery // 查询任务状态
)
type TaskCommand struct {
Type CommandType
Data any
}
// TaskManager 是后台任务的核心管理器
type TaskManager struct {
ctx context.Context
// commands 是一个只接收命令的管道channel用于接收外部的创建、停止和查询命令。
commands chan TaskCommand
// tasks 存储所有正在运行的任务。
tasks map[string]struct {
context.Context
context.CancelCauseFunc
}
rw sync.RWMutex
// tasksMetadata 存储所有任务的元数据,用于提供任务详情查询。
tasksMetadata map[string]TaskMetadata
rw1 sync.RWMutex
doneCh chan struct{}
close sync.Once
}
// NewTaskManager 创建并返回一个后台任务管理器实例及其命令管道。
// 外部通过 commands 通道向管理器发送命令。
func NewTaskManager() (*TaskManager, chan<- TaskCommand) {
// 实例化 TaskManager初始化 map 和 channel
tm := &TaskManager{
ctx: context.Background(),
commands: make(chan TaskCommand, 100), // 带有缓冲区的命令管道,避免阻塞
tasks: make(map[string]struct {
context.Context
context.CancelCauseFunc
}),
tasksMetadata: make(map[string]TaskMetadata),
doneCh: make(chan struct{}, 1),
}
// 返回管理器实例以及一个只写的命令管道
return tm, tm.commands
}
func (tm *TaskManager) Run() {
// 这个方法将在一个独立的 goroutine 中运行
// 负责从 tm.commands 通道读取命令并处理
for {
select {
case <-tm.doneCh:
return
case cmd, ok := <-tm.commands:
if !ok {
println("channel was closed")
return
}
// 根据命令类型调用私有方法处理
switch cmd.Type {
case CmdCreate:
if createTask, ok := cmd.Data.(TaskMetadata); ok {
go func(data any) {
tm.rw.Lock()
ctx, cancel := context.WithCancelCause(tm.ctx)
tm.tasks[createTask.ID] = struct {
context.Context
context.CancelCauseFunc
}{ctx, cancel}
tm.rw.Unlock()
tm.rw1.Lock()
tm.tasksMetadata[createTask.ID] = createTask
tm.rw1.Unlock()
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Printf("Task is running current user_id is %d\n", createTask.UID)
case <-ctx.Done():
fmt.Printf("Task is stopped.\n")
return
}
}
}(cmd.Data)
}
case CmdStop:
if taskID, ok := cmd.Data.(string); ok {
tm.rw.Lock()
if task, ok := tm.tasks[taskID]; ok {
task.CancelCauseFunc(errors.New("force to cancel the task"))
tm.rw1.Lock()
meta := tm.tasksMetadata[taskID]
meta.Status = false
meta.StopCause = context.Cause(task.Context).Error()
tm.tasksMetadata[taskID] = meta
tm.rw1.Unlock()
delete(tm.tasks, taskID)
}
tm.rw.Unlock()
}
case CmdQuery:
tm.GetTaskDetails()
}
}
}
}
// CreateTask 封装了创建任务的命令,并发送给管理器
func (tm *TaskManager) CreateTask(metadata TaskMetadata) {
tm.commands <- TaskCommand{
Type: CmdCreate,
Data: metadata,
}
}
// StopTask 封装了停止任务的命令通过任务ID来指定
func (tm *TaskManager) StopTask(taskID string) {
tm.commands <- TaskCommand{
Type: CmdStop,
Data: taskID,
}
}
// GetTaskDetails 返回当前所有正在运行任务的元数据列表
func (tm *TaskManager) GetTaskDetails() []TaskMetadata {
tm.rw1.RLock()
defer tm.rw1.RUnlock()
var details []TaskMetadata
for _, metadata := range tm.tasksMetadata {
details = append(details, metadata)
}
return details
}
// Stop 关闭任务管理器
// 先拒绝新任务,然后关闭所有当前任务,关闭一个任务就删除一条元数据
// 调用此方法默认关闭发送管道
func (tm *TaskManager) Stop() {
tm.close.Do(func() {
// 1. 发送信号给 Run 方法,使其退出
tm.doneCh <- struct{}{}
// 2. 遍历并关闭所有正在运行的任务
tm.rw.RLock()
for id, task := range tm.tasks {
fmt.Printf("Shutting down task %s...\n", id)
task.CancelCauseFunc(errors.New("task manager is shutting down"))
tm.rw1.Lock()
meta := tm.tasksMetadata[id]
meta.Status = false
meta.StopCause = context.Cause(task.Context).Error()
tm.tasksMetadata[id] = meta
tm.rw1.Unlock()
}
tm.rw.RUnlock()
// 4. 清空任务
tm.rw.Lock()
for k := range tm.tasks {
delete(tm.tasks, k)
}
tm.rw.Unlock()
fmt.Println("All tasks have been stopped and data has been cleared.")
// 5. 最后关闭 doneCh 和 commands 管道,以释放资源
close(tm.doneCh)
close(tm.commands)
})
}