mirror of
https://github.com/krau/SaveAny-Bot.git
synced 2026-06-26 01:31:29 +08:00
feat: add task command to client and Title method to Task for tasks queue managing, #157
This commit is contained in:
@@ -26,3 +26,20 @@ func handleCancelCallback(ctx *ext.Context, update *ext.Update) error {
|
||||
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
|
||||
func handleCancelCmd(ctx *ext.Context, update *ext.Update) error {
|
||||
logger := log.FromContext(ctx)
|
||||
args := strings.Fields(update.EffectiveMessage.Text)
|
||||
if len(args) < 2 {
|
||||
ctx.Reply(update, ext.ReplyTextString("用法: /cancel <task_id>"), nil)
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
taskID := args[1]
|
||||
if err := core.CancelTask(ctx, taskID); err != nil {
|
||||
logger.Errorf("failed to cancel task %s: %v", taskID, err)
|
||||
ctx.Reply(update, ext.ReplyTextString("取消任务失败: "+err.Error()), nil)
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
ctx.Reply(update, ext.ReplyTextString("已请求取消任务: "+taskID), nil)
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
|
||||
@@ -28,6 +28,8 @@ var CommandHandlers = []DescCommandHandler{
|
||||
{"rule", "管理自动存储规则", handleRuleCmd},
|
||||
{"save", "保存文件", handleSilentMode(handleSaveCmd, handleSilentSaveReplied)},
|
||||
{"dl", "下载给定链接的文件", handleDlCmd},
|
||||
{"task", "管理任务队列", handleTaskCmd},
|
||||
{"cancel", "取消任务", handleCancelCmd},
|
||||
{"watch", "监听聊天(UserBot)", handleWatchCmd},
|
||||
{"unwatch", "取消监听聊天(UserBot)", handleUnwatchCmd},
|
||||
{"lswatch", "列出监听的聊天(UserBot)", handleLswatchCmd},
|
||||
|
||||
113
client/bot/handlers/tasks.go
Normal file
113
client/bot/handlers/tasks.go
Normal file
@@ -0,0 +1,113 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/celestix/gotgproto/dispatcher"
|
||||
"github.com/celestix/gotgproto/ext"
|
||||
"github.com/charmbracelet/log"
|
||||
"github.com/gotd/td/telegram/message/styling"
|
||||
"github.com/krau/SaveAny-Bot/core"
|
||||
)
|
||||
|
||||
func handleTaskCmd(ctx *ext.Context, update *ext.Update) error {
|
||||
logger := log.FromContext(ctx)
|
||||
args := strings.Fields(update.EffectiveMessage.Text)
|
||||
if len(args) == 1 {
|
||||
showRunningTasks(ctx, update)
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
|
||||
switch args[1] {
|
||||
case "running", "run", "r":
|
||||
showRunningTasks(ctx, update)
|
||||
case "queued", "queue", "q", "waiting":
|
||||
showQueuedTasks(ctx, update)
|
||||
case "cancel", "c":
|
||||
if len(args) < 3 {
|
||||
ctx.Reply(update, ext.ReplyTextString("用法: /tasks cancel <task_id>"), nil)
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
taskID := args[2]
|
||||
if err := core.CancelTask(ctx, taskID); err != nil {
|
||||
logger.Errorf("取消任务 %s 失败: %v", taskID, err)
|
||||
ctx.Reply(update, ext.ReplyTextString("取消任务失败: "+err.Error()), nil)
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
ctx.Reply(update, ext.ReplyTextStyledTextArray([]styling.StyledTextOption{
|
||||
styling.Plain("已请求取消任务: "),
|
||||
styling.Code(taskID),
|
||||
}), nil)
|
||||
default:
|
||||
ctx.Reply(update, ext.ReplyTextString("用法: /tasks [running|queued|cancel <task_id>]"), nil)
|
||||
}
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
|
||||
func showRunningTasks(ctx *ext.Context, update *ext.Update) {
|
||||
tasks := core.GetRunningTasks(ctx)
|
||||
if len(tasks) == 0 {
|
||||
ctx.Reply(update, ext.ReplyTextString("当前没有正在运行的任务"), nil)
|
||||
return
|
||||
}
|
||||
opts := make([]styling.StyledTextOption, 0, 2+len(tasks)*4)
|
||||
opts = append(opts,
|
||||
styling.Bold("当前正在运行的任务:"),
|
||||
styling.Plain(fmt.Sprintf("\n总数: %d\n", len(tasks))),
|
||||
)
|
||||
for _, t := range tasks {
|
||||
created := t.Created.In(time.Local).Format("2006-01-02 15:04:05")
|
||||
status := "运行中"
|
||||
if t.Cancelled {
|
||||
status = "已请求取消"
|
||||
}
|
||||
opts = append(opts,
|
||||
styling.Plain("\nID: "),
|
||||
styling.Code(t.ID),
|
||||
styling.Plain("\n名称: "),
|
||||
styling.Code(t.Title),
|
||||
styling.Plain("\n创建时间: "),
|
||||
styling.Code(created),
|
||||
styling.Plain("\n状态: "),
|
||||
styling.Code(status),
|
||||
)
|
||||
}
|
||||
ctx.Reply(update, ext.ReplyTextStyledTextArray(opts), nil)
|
||||
}
|
||||
|
||||
func showQueuedTasks(ctx *ext.Context, update *ext.Update) {
|
||||
tasks := core.GetQueuedTasks(ctx)
|
||||
if len(tasks) == 0 {
|
||||
ctx.Reply(update, ext.ReplyTextString("当前没有排队中的任务"), nil)
|
||||
return
|
||||
}
|
||||
opts := make([]styling.StyledTextOption, 0, 2+len(tasks)*3)
|
||||
opts = append(opts,
|
||||
styling.Bold("当前排队中的任务:"),
|
||||
styling.Plain(fmt.Sprintf("\n总数: %d\n", len(tasks))),
|
||||
)
|
||||
for _, t := range tasks {
|
||||
created := t.Created.In(time.Local).Format("2006-01-02 15:04:05")
|
||||
status := "排队中"
|
||||
if t.Cancelled {
|
||||
status = "已请求取消"
|
||||
}
|
||||
opts = append(opts,
|
||||
styling.Plain("\nID: "),
|
||||
styling.Code(t.ID),
|
||||
styling.Plain("\n名称: "),
|
||||
styling.Code(t.Title),
|
||||
styling.Plain("\n创建时间: "),
|
||||
styling.Code(created),
|
||||
styling.Plain("\n状态: "),
|
||||
styling.Code(status),
|
||||
)
|
||||
if len(tasks) > 10 {
|
||||
opts = append(opts, styling.Plain("\n...\n只显示前 10 个任务, 共 "+fmt.Sprintf("%d", len(tasks))+" 个任务"))
|
||||
break
|
||||
}
|
||||
}
|
||||
ctx.Reply(update, ext.ReplyTextStyledTextArray(opts), nil)
|
||||
}
|
||||
44
core/core.go
44
core/core.go
@@ -10,15 +10,16 @@ import (
|
||||
"github.com/krau/SaveAny-Bot/pkg/queue"
|
||||
)
|
||||
|
||||
var queueInstance *queue.TaskQueue[Exectable]
|
||||
var queueInstance *queue.TaskQueue[Executable]
|
||||
|
||||
type Exectable interface {
|
||||
type Executable interface {
|
||||
Type() tasktype.TaskType
|
||||
Title() string
|
||||
TaskID() string
|
||||
Execute(ctx context.Context) error
|
||||
}
|
||||
|
||||
func worker(ctx context.Context, qe *queue.TaskQueue[Exectable], semaphore chan struct{}) {
|
||||
func worker(ctx context.Context, qe *queue.TaskQueue[Executable], semaphore chan struct{}) {
|
||||
logger := log.FromContext(ctx)
|
||||
execHooks := config.C().Hook.Exec
|
||||
for {
|
||||
@@ -28,27 +29,27 @@ func worker(ctx context.Context, qe *queue.TaskQueue[Exectable], semaphore chan
|
||||
logger.Error("Failed to get task from queue:", err)
|
||||
break // queue closed and empty
|
||||
}
|
||||
task := qtask.Data
|
||||
logger.Infof("Processing task: %s", task.TaskID())
|
||||
exe := qtask.Data
|
||||
logger.Infof("Processing task: %s", exe.TaskID())
|
||||
if err := ExecCommandString(qtask.Context(), execHooks.TaskBeforeStart); err != nil {
|
||||
logger.Errorf("Failed to execute before start hook for task %s: %v", task.TaskID(), err)
|
||||
logger.Errorf("Failed to execute before start hook for task %s: %v", exe.TaskID(), err)
|
||||
}
|
||||
if err := task.Execute(qtask.Context()); err != nil {
|
||||
if err := exe.Execute(qtask.Context()); err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
logger.Infof("Task %s was canceled", task.TaskID())
|
||||
logger.Infof("Task %s was canceled", exe.TaskID())
|
||||
if err := ExecCommandString(ctx, execHooks.TaskCancel); err != nil {
|
||||
logger.Errorf("Failed to execute cancel hook for task %s: %v", task.TaskID(), err)
|
||||
logger.Errorf("Failed to execute cancel hook for task %s: %v", exe.TaskID(), err)
|
||||
}
|
||||
} else {
|
||||
logger.Errorf("Failed to execute task %s: %v", task.TaskID(), err)
|
||||
logger.Errorf("Failed to execute task %s: %v", exe.TaskID(), err)
|
||||
if err := ExecCommandString(ctx, execHooks.TaskFail); err != nil {
|
||||
logger.Errorf("Failed to execute fail hook for task %s: %v", task.TaskID(), err)
|
||||
logger.Errorf("Failed to execute fail hook for task %s: %v", exe.TaskID(), err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.Infof("Task %s completed successfully", task.TaskID())
|
||||
logger.Infof("Task %s completed successfully", exe.TaskID())
|
||||
if err := ExecCommandString(ctx, execHooks.TaskSuccess); err != nil {
|
||||
logger.Errorf("Failed to execute success hook for task %s: %v", task.TaskID(), err)
|
||||
logger.Errorf("Failed to execute success hook for task %s: %v", exe.TaskID(), err)
|
||||
}
|
||||
}
|
||||
qe.Done(qtask.ID)
|
||||
@@ -60,7 +61,7 @@ func Run(ctx context.Context) {
|
||||
log.FromContext(ctx).Info("Start processing tasks...")
|
||||
semaphore := make(chan struct{}, config.C().Workers)
|
||||
if queueInstance == nil {
|
||||
queueInstance = queue.NewTaskQueue[Exectable]()
|
||||
queueInstance = queue.NewTaskQueue[Executable]()
|
||||
}
|
||||
for range config.C().Workers {
|
||||
go worker(ctx, queueInstance, semaphore)
|
||||
@@ -68,8 +69,8 @@ func Run(ctx context.Context) {
|
||||
|
||||
}
|
||||
|
||||
func AddTask(ctx context.Context, task Exectable) error {
|
||||
return queueInstance.Add(queue.NewTask(ctx, task.TaskID(), task))
|
||||
func AddTask(ctx context.Context, task Executable) error {
|
||||
return queueInstance.Add(queue.NewTask(ctx, task.TaskID(), task.Title(), task))
|
||||
}
|
||||
|
||||
func CancelTask(ctx context.Context, id string) error {
|
||||
@@ -78,8 +79,13 @@ func CancelTask(ctx context.Context, id string) error {
|
||||
}
|
||||
|
||||
func GetLength(ctx context.Context) int {
|
||||
if queueInstance == nil {
|
||||
return 0
|
||||
}
|
||||
return queueInstance.ActiveLength()
|
||||
}
|
||||
|
||||
func GetRunningTasks(ctx context.Context) []queue.TaskInfo {
|
||||
return queueInstance.RunningTasks()
|
||||
}
|
||||
|
||||
func GetQueuedTasks(ctx context.Context) []queue.TaskInfo {
|
||||
return queueInstance.QueuedTasks()
|
||||
}
|
||||
|
||||
@@ -8,12 +8,15 @@ import (
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/krau/SaveAny-Bot/config"
|
||||
"github.com/krau/SaveAny-Bot/core"
|
||||
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
||||
"github.com/krau/SaveAny-Bot/pkg/tfile"
|
||||
"github.com/krau/SaveAny-Bot/storage"
|
||||
"github.com/rs/xid"
|
||||
)
|
||||
|
||||
var _ core.Executable = (*Task)(nil)
|
||||
|
||||
type TaskElement struct {
|
||||
ID string
|
||||
Storage storage.Storage
|
||||
@@ -36,6 +39,11 @@ type Task struct {
|
||||
failed map[string]error // [TODO] errors for each element
|
||||
}
|
||||
|
||||
// Title implements core.Exectable.
|
||||
func (t *Task) Title() string {
|
||||
return fmt.Sprintf("[%s](%d files/%.2fMB)", t.Type(), len(t.elems), float64(t.totalSize)/(1024*1024))
|
||||
}
|
||||
|
||||
func (t *Task) Type() tasktype.TaskType {
|
||||
return tasktype.TaskTypeTgfiles
|
||||
}
|
||||
|
||||
@@ -2,11 +2,13 @@ package directlinks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/krau/SaveAny-Bot/config"
|
||||
"github.com/krau/SaveAny-Bot/core"
|
||||
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
||||
"github.com/krau/SaveAny-Bot/storage"
|
||||
)
|
||||
@@ -25,6 +27,8 @@ func (f *File) FileSize() int64 {
|
||||
return f.Size
|
||||
}
|
||||
|
||||
var _ core.Executable = (*Task)(nil)
|
||||
|
||||
type Task struct {
|
||||
ID string
|
||||
ctx context.Context
|
||||
@@ -44,6 +48,11 @@ type Task struct {
|
||||
failed map[string]error // [TODO] errors for each file
|
||||
}
|
||||
|
||||
// Title implements core.Exectable.
|
||||
func (t *Task) Title() string {
|
||||
return fmt.Sprintf("[%s](%s...->%s:%s)", t.Type(), t.files[0].Name, t.Storage.Name(), t.StorPath)
|
||||
}
|
||||
|
||||
// DownloadedBytes implements TaskInfo.
|
||||
func (t *Task) DownloadedBytes() int64 {
|
||||
return t.downloadedBytes.Load()
|
||||
|
||||
@@ -2,17 +2,21 @@ package parsed
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/krau/SaveAny-Bot/common/utils/netutil"
|
||||
"github.com/krau/SaveAny-Bot/config"
|
||||
"github.com/krau/SaveAny-Bot/core"
|
||||
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
||||
"github.com/krau/SaveAny-Bot/pkg/parser"
|
||||
"github.com/krau/SaveAny-Bot/storage"
|
||||
)
|
||||
|
||||
var _ core.Executable = (*Task)(nil)
|
||||
|
||||
type Task struct {
|
||||
ID string
|
||||
Ctx context.Context
|
||||
@@ -20,8 +24,8 @@ type Task struct {
|
||||
StorPath string
|
||||
item *parser.Item
|
||||
httpClient *http.Client // [TODO] btorrent support?
|
||||
progress ProgressTracker
|
||||
stream bool
|
||||
progress ProgressTracker
|
||||
stream bool
|
||||
|
||||
totalResources int64
|
||||
downloaded atomic.Int64 // downloaded resources count
|
||||
@@ -32,6 +36,11 @@ type Task struct {
|
||||
failed map[string]error // [TODO] errors for each resource
|
||||
}
|
||||
|
||||
// Title implements core.Exectable.
|
||||
func (t *Task) Title() string {
|
||||
return fmt.Sprintf("[%s](%s->%s:%s)", t.Type(), t.item.Title, t.Stor.Name(), t.StorPath)
|
||||
}
|
||||
|
||||
func (t *Task) Type() tasktype.TaskType {
|
||||
return tasktype.TaskTypeParseditem
|
||||
}
|
||||
|
||||
@@ -2,13 +2,17 @@ package telegraph
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/krau/SaveAny-Bot/core"
|
||||
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
||||
"github.com/krau/SaveAny-Bot/pkg/telegraph"
|
||||
"github.com/krau/SaveAny-Bot/storage"
|
||||
)
|
||||
|
||||
var _ core.Executable = (*Task)(nil)
|
||||
|
||||
type Task struct {
|
||||
ID string
|
||||
Ctx context.Context
|
||||
@@ -24,6 +28,11 @@ type Task struct {
|
||||
downloaded atomic.Int64
|
||||
}
|
||||
|
||||
// Title implements core.Exectable.
|
||||
func (t *Task) Title() string {
|
||||
return fmt.Sprintf("[%s](%s->%s:%s)", t.Type(), t.PhPath, t.Stor.Name(), t.StorPath)
|
||||
}
|
||||
|
||||
func (t *Task) Type() tasktype.TaskType {
|
||||
return tasktype.TaskTypeTphpics
|
||||
}
|
||||
|
||||
@@ -6,11 +6,14 @@ import (
|
||||
"path/filepath"
|
||||
|
||||
"github.com/krau/SaveAny-Bot/config"
|
||||
"github.com/krau/SaveAny-Bot/core"
|
||||
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
||||
"github.com/krau/SaveAny-Bot/pkg/tfile"
|
||||
"github.com/krau/SaveAny-Bot/storage"
|
||||
)
|
||||
|
||||
var _ core.Executable = (*Task)(nil)
|
||||
|
||||
type Task struct {
|
||||
ID string
|
||||
Ctx context.Context
|
||||
@@ -22,6 +25,11 @@ type Task struct {
|
||||
localPath string
|
||||
}
|
||||
|
||||
// Title implements core.Exectable.
|
||||
func (t *Task) Title() string {
|
||||
return fmt.Sprintf("[%s](%s->%s:%s)", t.Type(), t.File.Name(), t.Storage.Name(), t.Path)
|
||||
}
|
||||
|
||||
func (t *Task) Type() tasktype.TaskType {
|
||||
return tasktype.TaskTypeTgfiles
|
||||
}
|
||||
|
||||
@@ -125,6 +125,7 @@ func (tq *TaskQueue[T]) RunningTasks() []TaskInfo {
|
||||
}
|
||||
tasks = append(tasks, TaskInfo{
|
||||
ID: task.ID,
|
||||
Title: task.Title,
|
||||
Created: task.created,
|
||||
Cancelled: task.Cancelled(),
|
||||
})
|
||||
@@ -144,6 +145,7 @@ func (tq *TaskQueue[T]) QueuedTasks() []TaskInfo {
|
||||
if !task.Cancelled() {
|
||||
tasks = append(tasks, TaskInfo{
|
||||
ID: task.ID,
|
||||
Title: task.Title,
|
||||
Created: task.created,
|
||||
Cancelled: task.Cancelled(),
|
||||
})
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
|
||||
// helper to create a simple Task with integer payload
|
||||
func newTask(id string) *queue.Task[int] {
|
||||
return queue.NewTask(context.Background(), id, 0)
|
||||
return queue.NewTask(context.Background(), id, "testing", 0)
|
||||
}
|
||||
|
||||
func TestAddAndLength(t *testing.T) {
|
||||
@@ -103,4 +103,4 @@ func TestConcurrencySafety(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
type Task[T any] struct {
|
||||
ID string
|
||||
Title string
|
||||
Data T
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
@@ -20,12 +21,14 @@ type TaskInfo struct {
|
||||
ID string
|
||||
Created time.Time
|
||||
Cancelled bool
|
||||
Title string
|
||||
}
|
||||
|
||||
func NewTask[T any](ctx context.Context, id string, data T) *Task[T] {
|
||||
func NewTask[T any](ctx context.Context, id string, title string, data T) *Task[T] {
|
||||
cancelCtx, cancel := context.WithCancel(ctx)
|
||||
return &Task[T]{
|
||||
ID: id,
|
||||
Title: title,
|
||||
Data: data,
|
||||
ctx: cancelCtx,
|
||||
cancel: cancel,
|
||||
|
||||
Reference in New Issue
Block a user