Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c0b4580e34 | ||
|
|
280fd6ead8 | ||
|
|
0ca3d97711 | ||
|
|
51198a1e3d | ||
|
|
651835c467 | ||
|
|
45c978980c |
@@ -26,3 +26,20 @@ func handleCancelCallback(ctx *ext.Context, update *ext.Update) error {
|
|||||||
|
|
||||||
return dispatcher.EndGroups
|
return dispatcher.EndGroups
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func handleCancelCmd(ctx *ext.Context, update *ext.Update) error {
|
||||||
|
logger := log.FromContext(ctx)
|
||||||
|
args := strings.Fields(update.EffectiveMessage.Text)
|
||||||
|
if len(args) < 2 {
|
||||||
|
ctx.Reply(update, ext.ReplyTextString("用法: /cancel <task_id>"), nil)
|
||||||
|
return dispatcher.EndGroups
|
||||||
|
}
|
||||||
|
taskID := args[1]
|
||||||
|
if err := core.CancelTask(ctx, taskID); err != nil {
|
||||||
|
logger.Errorf("failed to cancel task %s: %v", taskID, err)
|
||||||
|
ctx.Reply(update, ext.ReplyTextString("取消任务失败: "+err.Error()), nil)
|
||||||
|
return dispatcher.EndGroups
|
||||||
|
}
|
||||||
|
ctx.Reply(update, ext.ReplyTextString("已请求取消任务: "+taskID), nil)
|
||||||
|
return dispatcher.EndGroups
|
||||||
|
}
|
||||||
|
|||||||
@@ -28,6 +28,8 @@ var CommandHandlers = []DescCommandHandler{
|
|||||||
{"rule", "管理自动存储规则", handleRuleCmd},
|
{"rule", "管理自动存储规则", handleRuleCmd},
|
||||||
{"save", "保存文件", handleSilentMode(handleSaveCmd, handleSilentSaveReplied)},
|
{"save", "保存文件", handleSilentMode(handleSaveCmd, handleSilentSaveReplied)},
|
||||||
{"dl", "下载给定链接的文件", handleDlCmd},
|
{"dl", "下载给定链接的文件", handleDlCmd},
|
||||||
|
{"task", "管理任务队列", handleTaskCmd},
|
||||||
|
{"cancel", "取消任务", handleCancelCmd},
|
||||||
{"watch", "监听聊天(UserBot)", handleWatchCmd},
|
{"watch", "监听聊天(UserBot)", handleWatchCmd},
|
||||||
{"unwatch", "取消监听聊天(UserBot)", handleUnwatchCmd},
|
{"unwatch", "取消监听聊天(UserBot)", handleUnwatchCmd},
|
||||||
{"lswatch", "列出监听的聊天(UserBot)", handleLswatchCmd},
|
{"lswatch", "列出监听的聊天(UserBot)", handleLswatchCmd},
|
||||||
|
|||||||
113
client/bot/handlers/tasks.go
Normal file
113
client/bot/handlers/tasks.go
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
package handlers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/celestix/gotgproto/dispatcher"
|
||||||
|
"github.com/celestix/gotgproto/ext"
|
||||||
|
"github.com/charmbracelet/log"
|
||||||
|
"github.com/gotd/td/telegram/message/styling"
|
||||||
|
"github.com/krau/SaveAny-Bot/core"
|
||||||
|
)
|
||||||
|
|
||||||
|
func handleTaskCmd(ctx *ext.Context, update *ext.Update) error {
|
||||||
|
logger := log.FromContext(ctx)
|
||||||
|
args := strings.Fields(update.EffectiveMessage.Text)
|
||||||
|
if len(args) == 1 {
|
||||||
|
showRunningTasks(ctx, update)
|
||||||
|
return dispatcher.EndGroups
|
||||||
|
}
|
||||||
|
|
||||||
|
switch args[1] {
|
||||||
|
case "running", "run", "r":
|
||||||
|
showRunningTasks(ctx, update)
|
||||||
|
case "queued", "queue", "q", "waiting":
|
||||||
|
showQueuedTasks(ctx, update)
|
||||||
|
case "cancel", "c":
|
||||||
|
if len(args) < 3 {
|
||||||
|
ctx.Reply(update, ext.ReplyTextString("用法: /tasks cancel <task_id>"), nil)
|
||||||
|
return dispatcher.EndGroups
|
||||||
|
}
|
||||||
|
taskID := args[2]
|
||||||
|
if err := core.CancelTask(ctx, taskID); err != nil {
|
||||||
|
logger.Errorf("取消任务 %s 失败: %v", taskID, err)
|
||||||
|
ctx.Reply(update, ext.ReplyTextString("取消任务失败: "+err.Error()), nil)
|
||||||
|
return dispatcher.EndGroups
|
||||||
|
}
|
||||||
|
ctx.Reply(update, ext.ReplyTextStyledTextArray([]styling.StyledTextOption{
|
||||||
|
styling.Plain("已请求取消任务: "),
|
||||||
|
styling.Code(taskID),
|
||||||
|
}), nil)
|
||||||
|
default:
|
||||||
|
ctx.Reply(update, ext.ReplyTextString("用法: /tasks [running|queued|cancel <task_id>]"), nil)
|
||||||
|
}
|
||||||
|
return dispatcher.EndGroups
|
||||||
|
}
|
||||||
|
|
||||||
|
func showRunningTasks(ctx *ext.Context, update *ext.Update) {
|
||||||
|
tasks := core.GetRunningTasks(ctx)
|
||||||
|
if len(tasks) == 0 {
|
||||||
|
ctx.Reply(update, ext.ReplyTextString("当前没有正在运行的任务"), nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
opts := make([]styling.StyledTextOption, 0, 2+len(tasks)*4)
|
||||||
|
opts = append(opts,
|
||||||
|
styling.Bold("当前正在运行的任务:"),
|
||||||
|
styling.Plain(fmt.Sprintf("\n总数: %d\n", len(tasks))),
|
||||||
|
)
|
||||||
|
for _, t := range tasks {
|
||||||
|
created := t.Created.In(time.Local).Format("2006-01-02 15:04:05")
|
||||||
|
status := "运行中"
|
||||||
|
if t.Cancelled {
|
||||||
|
status = "已请求取消"
|
||||||
|
}
|
||||||
|
opts = append(opts,
|
||||||
|
styling.Plain("\nID: "),
|
||||||
|
styling.Code(t.ID),
|
||||||
|
styling.Plain("\n名称: "),
|
||||||
|
styling.Code(t.Title),
|
||||||
|
styling.Plain("\n创建时间: "),
|
||||||
|
styling.Code(created),
|
||||||
|
styling.Plain("\n状态: "),
|
||||||
|
styling.Code(status),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
ctx.Reply(update, ext.ReplyTextStyledTextArray(opts), nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func showQueuedTasks(ctx *ext.Context, update *ext.Update) {
|
||||||
|
tasks := core.GetQueuedTasks(ctx)
|
||||||
|
if len(tasks) == 0 {
|
||||||
|
ctx.Reply(update, ext.ReplyTextString("当前没有排队中的任务"), nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
opts := make([]styling.StyledTextOption, 0, 2+len(tasks)*3)
|
||||||
|
opts = append(opts,
|
||||||
|
styling.Bold("当前排队中的任务:"),
|
||||||
|
styling.Plain(fmt.Sprintf("\n总数: %d\n", len(tasks))),
|
||||||
|
)
|
||||||
|
for _, t := range tasks {
|
||||||
|
created := t.Created.In(time.Local).Format("2006-01-02 15:04:05")
|
||||||
|
status := "排队中"
|
||||||
|
if t.Cancelled {
|
||||||
|
status = "已请求取消"
|
||||||
|
}
|
||||||
|
opts = append(opts,
|
||||||
|
styling.Plain("\nID: "),
|
||||||
|
styling.Code(t.ID),
|
||||||
|
styling.Plain("\n名称: "),
|
||||||
|
styling.Code(t.Title),
|
||||||
|
styling.Plain("\n创建时间: "),
|
||||||
|
styling.Code(created),
|
||||||
|
styling.Plain("\n状态: "),
|
||||||
|
styling.Code(status),
|
||||||
|
)
|
||||||
|
if len(tasks) > 10 {
|
||||||
|
opts = append(opts, styling.Plain("\n...\n只显示前 10 个任务, 共 "+fmt.Sprintf("%d", len(tasks))+" 个任务"))
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ctx.Reply(update, ext.ReplyTextStyledTextArray(opts), nil)
|
||||||
|
}
|
||||||
@@ -12,6 +12,11 @@ type TelegramStorageConfig struct {
|
|||||||
ForceFile bool `toml:"force_file" mapstructure:"force_file" json:"force_file"`
|
ForceFile bool `toml:"force_file" mapstructure:"force_file" json:"force_file"`
|
||||||
RateLimit int `toml:"rate_limit" mapstructure:"rate_limit" json:"rate_limit"`
|
RateLimit int `toml:"rate_limit" mapstructure:"rate_limit" json:"rate_limit"`
|
||||||
RateBurst int `toml:"rate_burst" mapstructure:"rate_burst" json:"rate_burst"`
|
RateBurst int `toml:"rate_burst" mapstructure:"rate_burst" json:"rate_burst"`
|
||||||
|
SkipLarge bool `toml:"skip_large" mapstructure:"skip_large" json:"skip_large"` // skip files larger than Telegram limit(2GB)
|
||||||
|
// split files larger than Telegram limit(2GB) into parts of specified size, in MB, leave 0 to set default(2000MB)
|
||||||
|
// only effective when SkipLarge is false
|
||||||
|
// use zip when splitting
|
||||||
|
SplitSizeMB int64 `toml:"split_size_mb" mapstructure:"split_size_mb" json:"split_size_mb"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *TelegramStorageConfig) Validate() error {
|
func (m *TelegramStorageConfig) Validate() error {
|
||||||
|
|||||||
44
core/core.go
44
core/core.go
@@ -10,15 +10,16 @@ import (
|
|||||||
"github.com/krau/SaveAny-Bot/pkg/queue"
|
"github.com/krau/SaveAny-Bot/pkg/queue"
|
||||||
)
|
)
|
||||||
|
|
||||||
var queueInstance *queue.TaskQueue[Exectable]
|
var queueInstance *queue.TaskQueue[Executable]
|
||||||
|
|
||||||
type Exectable interface {
|
type Executable interface {
|
||||||
Type() tasktype.TaskType
|
Type() tasktype.TaskType
|
||||||
|
Title() string
|
||||||
TaskID() string
|
TaskID() string
|
||||||
Execute(ctx context.Context) error
|
Execute(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func worker(ctx context.Context, qe *queue.TaskQueue[Exectable], semaphore chan struct{}) {
|
func worker(ctx context.Context, qe *queue.TaskQueue[Executable], semaphore chan struct{}) {
|
||||||
logger := log.FromContext(ctx)
|
logger := log.FromContext(ctx)
|
||||||
execHooks := config.C().Hook.Exec
|
execHooks := config.C().Hook.Exec
|
||||||
for {
|
for {
|
||||||
@@ -28,27 +29,27 @@ func worker(ctx context.Context, qe *queue.TaskQueue[Exectable], semaphore chan
|
|||||||
logger.Error("Failed to get task from queue:", err)
|
logger.Error("Failed to get task from queue:", err)
|
||||||
break // queue closed and empty
|
break // queue closed and empty
|
||||||
}
|
}
|
||||||
task := qtask.Data
|
exe := qtask.Data
|
||||||
logger.Infof("Processing task: %s", task.TaskID())
|
logger.Infof("Processing task: %s", exe.TaskID())
|
||||||
if err := ExecCommandString(qtask.Context(), execHooks.TaskBeforeStart); err != nil {
|
if err := ExecCommandString(qtask.Context(), execHooks.TaskBeforeStart); err != nil {
|
||||||
logger.Errorf("Failed to execute before start hook for task %s: %v", task.TaskID(), err)
|
logger.Errorf("Failed to execute before start hook for task %s: %v", exe.TaskID(), err)
|
||||||
}
|
}
|
||||||
if err := task.Execute(qtask.Context()); err != nil {
|
if err := exe.Execute(qtask.Context()); err != nil {
|
||||||
if errors.Is(err, context.Canceled) {
|
if errors.Is(err, context.Canceled) {
|
||||||
logger.Infof("Task %s was canceled", task.TaskID())
|
logger.Infof("Task %s was canceled", exe.TaskID())
|
||||||
if err := ExecCommandString(ctx, execHooks.TaskCancel); err != nil {
|
if err := ExecCommandString(ctx, execHooks.TaskCancel); err != nil {
|
||||||
logger.Errorf("Failed to execute cancel hook for task %s: %v", task.TaskID(), err)
|
logger.Errorf("Failed to execute cancel hook for task %s: %v", exe.TaskID(), err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.Errorf("Failed to execute task %s: %v", task.TaskID(), err)
|
logger.Errorf("Failed to execute task %s: %v", exe.TaskID(), err)
|
||||||
if err := ExecCommandString(ctx, execHooks.TaskFail); err != nil {
|
if err := ExecCommandString(ctx, execHooks.TaskFail); err != nil {
|
||||||
logger.Errorf("Failed to execute fail hook for task %s: %v", task.TaskID(), err)
|
logger.Errorf("Failed to execute fail hook for task %s: %v", exe.TaskID(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.Infof("Task %s completed successfully", task.TaskID())
|
logger.Infof("Task %s completed successfully", exe.TaskID())
|
||||||
if err := ExecCommandString(ctx, execHooks.TaskSuccess); err != nil {
|
if err := ExecCommandString(ctx, execHooks.TaskSuccess); err != nil {
|
||||||
logger.Errorf("Failed to execute success hook for task %s: %v", task.TaskID(), err)
|
logger.Errorf("Failed to execute success hook for task %s: %v", exe.TaskID(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
qe.Done(qtask.ID)
|
qe.Done(qtask.ID)
|
||||||
@@ -60,7 +61,7 @@ func Run(ctx context.Context) {
|
|||||||
log.FromContext(ctx).Info("Start processing tasks...")
|
log.FromContext(ctx).Info("Start processing tasks...")
|
||||||
semaphore := make(chan struct{}, config.C().Workers)
|
semaphore := make(chan struct{}, config.C().Workers)
|
||||||
if queueInstance == nil {
|
if queueInstance == nil {
|
||||||
queueInstance = queue.NewTaskQueue[Exectable]()
|
queueInstance = queue.NewTaskQueue[Executable]()
|
||||||
}
|
}
|
||||||
for range config.C().Workers {
|
for range config.C().Workers {
|
||||||
go worker(ctx, queueInstance, semaphore)
|
go worker(ctx, queueInstance, semaphore)
|
||||||
@@ -68,8 +69,8 @@ func Run(ctx context.Context) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func AddTask(ctx context.Context, task Exectable) error {
|
func AddTask(ctx context.Context, task Executable) error {
|
||||||
return queueInstance.Add(queue.NewTask(ctx, task.TaskID(), task))
|
return queueInstance.Add(queue.NewTask(ctx, task.TaskID(), task.Title(), task))
|
||||||
}
|
}
|
||||||
|
|
||||||
func CancelTask(ctx context.Context, id string) error {
|
func CancelTask(ctx context.Context, id string) error {
|
||||||
@@ -78,8 +79,13 @@ func CancelTask(ctx context.Context, id string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func GetLength(ctx context.Context) int {
|
func GetLength(ctx context.Context) int {
|
||||||
if queueInstance == nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
return queueInstance.ActiveLength()
|
return queueInstance.ActiveLength()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetRunningTasks(ctx context.Context) []queue.TaskInfo {
|
||||||
|
return queueInstance.RunningTasks()
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetQueuedTasks(ctx context.Context) []queue.TaskInfo {
|
||||||
|
return queueInstance.QueuedTasks()
|
||||||
|
}
|
||||||
|
|||||||
@@ -8,12 +8,15 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/krau/SaveAny-Bot/config"
|
"github.com/krau/SaveAny-Bot/config"
|
||||||
|
"github.com/krau/SaveAny-Bot/core"
|
||||||
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
||||||
"github.com/krau/SaveAny-Bot/pkg/tfile"
|
"github.com/krau/SaveAny-Bot/pkg/tfile"
|
||||||
"github.com/krau/SaveAny-Bot/storage"
|
"github.com/krau/SaveAny-Bot/storage"
|
||||||
"github.com/rs/xid"
|
"github.com/rs/xid"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var _ core.Executable = (*Task)(nil)
|
||||||
|
|
||||||
type TaskElement struct {
|
type TaskElement struct {
|
||||||
ID string
|
ID string
|
||||||
Storage storage.Storage
|
Storage storage.Storage
|
||||||
@@ -36,6 +39,11 @@ type Task struct {
|
|||||||
failed map[string]error // [TODO] errors for each element
|
failed map[string]error // [TODO] errors for each element
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Title implements core.Exectable.
|
||||||
|
func (t *Task) Title() string {
|
||||||
|
return fmt.Sprintf("[%s](%d files/%.2fMB)", t.Type(), len(t.elems), float64(t.totalSize)/(1024*1024))
|
||||||
|
}
|
||||||
|
|
||||||
func (t *Task) Type() tasktype.TaskType {
|
func (t *Task) Type() tasktype.TaskType {
|
||||||
return tasktype.TaskTypeTgfiles
|
return tasktype.TaskTypeTgfiles
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,11 +2,13 @@ package directlinks
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/krau/SaveAny-Bot/config"
|
"github.com/krau/SaveAny-Bot/config"
|
||||||
|
"github.com/krau/SaveAny-Bot/core"
|
||||||
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
||||||
"github.com/krau/SaveAny-Bot/storage"
|
"github.com/krau/SaveAny-Bot/storage"
|
||||||
)
|
)
|
||||||
@@ -25,6 +27,8 @@ func (f *File) FileSize() int64 {
|
|||||||
return f.Size
|
return f.Size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ core.Executable = (*Task)(nil)
|
||||||
|
|
||||||
type Task struct {
|
type Task struct {
|
||||||
ID string
|
ID string
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
@@ -44,6 +48,11 @@ type Task struct {
|
|||||||
failed map[string]error // [TODO] errors for each file
|
failed map[string]error // [TODO] errors for each file
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Title implements core.Exectable.
|
||||||
|
func (t *Task) Title() string {
|
||||||
|
return fmt.Sprintf("[%s](%s...->%s:%s)", t.Type(), t.files[0].Name, t.Storage.Name(), t.StorPath)
|
||||||
|
}
|
||||||
|
|
||||||
// DownloadedBytes implements TaskInfo.
|
// DownloadedBytes implements TaskInfo.
|
||||||
func (t *Task) DownloadedBytes() int64 {
|
func (t *Task) DownloadedBytes() int64 {
|
||||||
return t.downloadedBytes.Load()
|
return t.downloadedBytes.Load()
|
||||||
|
|||||||
@@ -2,17 +2,21 @@ package parsed
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/krau/SaveAny-Bot/common/utils/netutil"
|
"github.com/krau/SaveAny-Bot/common/utils/netutil"
|
||||||
"github.com/krau/SaveAny-Bot/config"
|
"github.com/krau/SaveAny-Bot/config"
|
||||||
|
"github.com/krau/SaveAny-Bot/core"
|
||||||
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
||||||
"github.com/krau/SaveAny-Bot/pkg/parser"
|
"github.com/krau/SaveAny-Bot/pkg/parser"
|
||||||
"github.com/krau/SaveAny-Bot/storage"
|
"github.com/krau/SaveAny-Bot/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var _ core.Executable = (*Task)(nil)
|
||||||
|
|
||||||
type Task struct {
|
type Task struct {
|
||||||
ID string
|
ID string
|
||||||
Ctx context.Context
|
Ctx context.Context
|
||||||
@@ -20,8 +24,8 @@ type Task struct {
|
|||||||
StorPath string
|
StorPath string
|
||||||
item *parser.Item
|
item *parser.Item
|
||||||
httpClient *http.Client // [TODO] btorrent support?
|
httpClient *http.Client // [TODO] btorrent support?
|
||||||
progress ProgressTracker
|
progress ProgressTracker
|
||||||
stream bool
|
stream bool
|
||||||
|
|
||||||
totalResources int64
|
totalResources int64
|
||||||
downloaded atomic.Int64 // downloaded resources count
|
downloaded atomic.Int64 // downloaded resources count
|
||||||
@@ -32,6 +36,11 @@ type Task struct {
|
|||||||
failed map[string]error // [TODO] errors for each resource
|
failed map[string]error // [TODO] errors for each resource
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Title implements core.Exectable.
|
||||||
|
func (t *Task) Title() string {
|
||||||
|
return fmt.Sprintf("[%s](%s->%s:%s)", t.Type(), t.item.Title, t.Stor.Name(), t.StorPath)
|
||||||
|
}
|
||||||
|
|
||||||
func (t *Task) Type() tasktype.TaskType {
|
func (t *Task) Type() tasktype.TaskType {
|
||||||
return tasktype.TaskTypeParseditem
|
return tasktype.TaskTypeParseditem
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,13 +2,17 @@ package telegraph
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/krau/SaveAny-Bot/core"
|
||||||
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
||||||
"github.com/krau/SaveAny-Bot/pkg/telegraph"
|
"github.com/krau/SaveAny-Bot/pkg/telegraph"
|
||||||
"github.com/krau/SaveAny-Bot/storage"
|
"github.com/krau/SaveAny-Bot/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var _ core.Executable = (*Task)(nil)
|
||||||
|
|
||||||
type Task struct {
|
type Task struct {
|
||||||
ID string
|
ID string
|
||||||
Ctx context.Context
|
Ctx context.Context
|
||||||
@@ -24,6 +28,11 @@ type Task struct {
|
|||||||
downloaded atomic.Int64
|
downloaded atomic.Int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Title implements core.Exectable.
|
||||||
|
func (t *Task) Title() string {
|
||||||
|
return fmt.Sprintf("[%s](%s->%s:%s)", t.Type(), t.PhPath, t.Stor.Name(), t.StorPath)
|
||||||
|
}
|
||||||
|
|
||||||
func (t *Task) Type() tasktype.TaskType {
|
func (t *Task) Type() tasktype.TaskType {
|
||||||
return tasktype.TaskTypeTphpics
|
return tasktype.TaskTypeTphpics
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,11 +6,14 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/krau/SaveAny-Bot/config"
|
"github.com/krau/SaveAny-Bot/config"
|
||||||
|
"github.com/krau/SaveAny-Bot/core"
|
||||||
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
||||||
"github.com/krau/SaveAny-Bot/pkg/tfile"
|
"github.com/krau/SaveAny-Bot/pkg/tfile"
|
||||||
"github.com/krau/SaveAny-Bot/storage"
|
"github.com/krau/SaveAny-Bot/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var _ core.Executable = (*Task)(nil)
|
||||||
|
|
||||||
type Task struct {
|
type Task struct {
|
||||||
ID string
|
ID string
|
||||||
Ctx context.Context
|
Ctx context.Context
|
||||||
@@ -22,6 +25,11 @@ type Task struct {
|
|||||||
localPath string
|
localPath string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Title implements core.Exectable.
|
||||||
|
func (t *Task) Title() string {
|
||||||
|
return fmt.Sprintf("[%s](%s->%s:%s)", t.Type(), t.File.Name(), t.Storage.Name(), t.Path)
|
||||||
|
}
|
||||||
|
|
||||||
func (t *Task) Type() tasktype.TaskType {
|
func (t *Task) Type() tasktype.TaskType {
|
||||||
return tasktype.TaskTypeTgfiles
|
return tasktype.TaskTypeTgfiles
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ func (tq *TaskQueue[T]) Add(task *Task[T]) error {
|
|||||||
return fmt.Errorf("task with ID %s already exists", task.ID)
|
return fmt.Errorf("task with ID %s already exists", task.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
if task.IsCancelled() {
|
if task.Cancelled() {
|
||||||
return fmt.Errorf("task %s has been cancelled", task.ID)
|
return fmt.Errorf("task %s has been cancelled", task.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -50,6 +50,8 @@ func (tq *TaskQueue[T]) Add(task *Task[T]) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get retrieves and removes the next non-cancelled task from the queue, adding it to the running tasks.
|
||||||
|
// Blocks until a task is available or the queue is closed.
|
||||||
func (tq *TaskQueue[T]) Get() (*Task[T], error) {
|
func (tq *TaskQueue[T]) Get() (*Task[T], error) {
|
||||||
tq.mu.Lock()
|
tq.mu.Lock()
|
||||||
defer tq.mu.Unlock()
|
defer tq.mu.Unlock()
|
||||||
@@ -69,7 +71,7 @@ func (tq *TaskQueue[T]) Get() (*Task[T], error) {
|
|||||||
tq.tasks.Remove(element)
|
tq.tasks.Remove(element)
|
||||||
task.element = nil
|
task.element = nil
|
||||||
|
|
||||||
if !task.IsCancelled() {
|
if !task.Cancelled() {
|
||||||
tq.runningTaskMap[task.ID] = task
|
tq.runningTaskMap[task.ID] = task
|
||||||
return task, nil
|
return task, nil
|
||||||
}
|
}
|
||||||
@@ -82,38 +84,21 @@ func (tq *TaskQueue[T]) Get() (*Task[T], error) {
|
|||||||
return nil, fmt.Errorf("queue is closed and empty")
|
return nil, fmt.Errorf("queue is closed and empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Done stops(cancels) and removes the task from the running tasks.
|
||||||
func (tq *TaskQueue[T]) Done(taskID string) {
|
func (tq *TaskQueue[T]) Done(taskID string) {
|
||||||
tq.mu.Lock()
|
tq.mu.Lock()
|
||||||
defer tq.mu.Unlock()
|
defer tq.mu.Unlock()
|
||||||
|
|
||||||
delete(tq.taskMap, taskID)
|
delete(tq.taskMap, taskID)
|
||||||
delete(tq.runningTaskMap, taskID)
|
delete(tq.runningTaskMap, taskID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tq *TaskQueue[T]) Peek() (*Task[T], error) {
|
|
||||||
tq.mu.RLock()
|
|
||||||
defer tq.mu.RUnlock()
|
|
||||||
|
|
||||||
if tq.tasks.Len() == 0 {
|
|
||||||
return nil, fmt.Errorf("queue is empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
for element := tq.tasks.Front(); element != nil; element = element.Next() {
|
|
||||||
task := element.Value.(*Task[T])
|
|
||||||
if !task.IsCancelled() {
|
|
||||||
return task, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, fmt.Errorf("queue has no valid tasks")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tq *TaskQueue[T]) Length() int {
|
func (tq *TaskQueue[T]) Length() int {
|
||||||
tq.mu.RLock()
|
tq.mu.RLock()
|
||||||
defer tq.mu.RUnlock()
|
defer tq.mu.RUnlock()
|
||||||
return tq.tasks.Len()
|
return tq.tasks.Len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ActiveLength returns the number of non-cancelled tasks in the queue.
|
||||||
func (tq *TaskQueue[T]) ActiveLength() int {
|
func (tq *TaskQueue[T]) ActiveLength() int {
|
||||||
tq.mu.RLock()
|
tq.mu.RLock()
|
||||||
defer tq.mu.RUnlock()
|
defer tq.mu.RUnlock()
|
||||||
@@ -121,13 +106,58 @@ func (tq *TaskQueue[T]) ActiveLength() int {
|
|||||||
count := 0
|
count := 0
|
||||||
for element := tq.tasks.Front(); element != nil; element = element.Next() {
|
for element := tq.tasks.Front(); element != nil; element = element.Next() {
|
||||||
task := element.Value.(*Task[T])
|
task := element.Value.(*Task[T])
|
||||||
if !task.IsCancelled() {
|
if !task.Cancelled() {
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return count
|
return count
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RunningTasks returns the currently running tasks' info.
|
||||||
|
func (tq *TaskQueue[T]) RunningTasks() []TaskInfo {
|
||||||
|
tq.mu.RLock()
|
||||||
|
defer tq.mu.RUnlock()
|
||||||
|
|
||||||
|
tasks := make([]TaskInfo, 0, len(tq.runningTaskMap))
|
||||||
|
for _, task := range tq.runningTaskMap {
|
||||||
|
if task.Cancelled() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tasks = append(tasks, TaskInfo{
|
||||||
|
ID: task.ID,
|
||||||
|
Title: task.Title,
|
||||||
|
Created: task.created,
|
||||||
|
Cancelled: task.Cancelled(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return tasks
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueuedTasks returns the queued (not yet running) tasks' info.
|
||||||
|
// The sorting is in the order of addition.
|
||||||
|
func (tq *TaskQueue[T]) QueuedTasks() []TaskInfo {
|
||||||
|
tq.mu.RLock()
|
||||||
|
defer tq.mu.RUnlock()
|
||||||
|
|
||||||
|
tasks := make([]TaskInfo, 0, tq.tasks.Len())
|
||||||
|
for element := tq.tasks.Front(); element != nil; element = element.Next() {
|
||||||
|
task := element.Value.(*Task[T])
|
||||||
|
if !task.Cancelled() {
|
||||||
|
tasks = append(tasks, TaskInfo{
|
||||||
|
ID: task.ID,
|
||||||
|
Title: task.Title,
|
||||||
|
Created: task.created,
|
||||||
|
Cancelled: task.Cancelled(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tasks
|
||||||
|
}
|
||||||
|
|
||||||
|
// CancelTask cancels a task by its ID.
|
||||||
|
// It looks for the task in both queued and running tasks.
|
||||||
|
// [NOTE] Cancelled tasks will not be removed from the queue, but marked as cancelled. Use Done to remove them.
|
||||||
|
// [WARN] Cancelling a running task relies on the task's implementation to respect the cancellation. If the task does not check for cancellation, it may continue running.
|
||||||
func (tq *TaskQueue[T]) CancelTask(taskID string) error {
|
func (tq *TaskQueue[T]) CancelTask(taskID string) error {
|
||||||
tq.mu.RLock()
|
tq.mu.RLock()
|
||||||
task, exists := tq.taskMap[taskID]
|
task, exists := tq.taskMap[taskID]
|
||||||
@@ -144,52 +174,6 @@ func (tq *TaskQueue[T]) CancelTask(taskID string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tq *TaskQueue[T]) RemoveTask(taskID string) error {
|
|
||||||
tq.mu.Lock()
|
|
||||||
defer tq.mu.Unlock()
|
|
||||||
|
|
||||||
task, exists := tq.taskMap[taskID]
|
|
||||||
if !exists {
|
|
||||||
_, exists = tq.runningTaskMap[taskID]
|
|
||||||
if exists {
|
|
||||||
delete(tq.runningTaskMap, taskID)
|
|
||||||
}
|
|
||||||
return fmt.Errorf("task %s is already running, cannot remove from queue", taskID)
|
|
||||||
}
|
|
||||||
|
|
||||||
if task.element != nil {
|
|
||||||
tq.tasks.Remove(task.element)
|
|
||||||
}
|
|
||||||
delete(tq.taskMap, taskID)
|
|
||||||
task.Cancel()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tq *TaskQueue[T]) CancelAll() {
|
|
||||||
tq.mu.RLock()
|
|
||||||
tasks := make([]*Task[T], 0, tq.tasks.Len())
|
|
||||||
for element := tq.tasks.Front(); element != nil; element = element.Next() {
|
|
||||||
tasks = append(tasks, element.Value.(*Task[T]))
|
|
||||||
}
|
|
||||||
tq.mu.RUnlock()
|
|
||||||
|
|
||||||
for _, task := range tasks {
|
|
||||||
task.Cancel()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tq *TaskQueue[T]) GetTask(taskID string) (*Task[T], error) {
|
|
||||||
tq.mu.RLock()
|
|
||||||
defer tq.mu.RUnlock()
|
|
||||||
|
|
||||||
task, exists := tq.taskMap[taskID]
|
|
||||||
if !exists {
|
|
||||||
return nil, fmt.Errorf("task %s does not exist", taskID)
|
|
||||||
}
|
|
||||||
|
|
||||||
return task, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tq *TaskQueue[T]) Close() {
|
func (tq *TaskQueue[T]) Close() {
|
||||||
tq.mu.Lock()
|
tq.mu.Lock()
|
||||||
defer tq.mu.Unlock()
|
defer tq.mu.Unlock()
|
||||||
@@ -197,45 +181,3 @@ func (tq *TaskQueue[T]) Close() {
|
|||||||
tq.closed = true
|
tq.closed = true
|
||||||
tq.cond.Broadcast()
|
tq.cond.Broadcast()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tq *TaskQueue[T]) IsClosed() bool {
|
|
||||||
tq.mu.RLock()
|
|
||||||
defer tq.mu.RUnlock()
|
|
||||||
return tq.closed
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tq *TaskQueue[T]) Clear() {
|
|
||||||
tq.mu.Lock()
|
|
||||||
defer tq.mu.Unlock()
|
|
||||||
|
|
||||||
for element := tq.tasks.Front(); element != nil; element = element.Next() {
|
|
||||||
task := element.Value.(*Task[T])
|
|
||||||
task.Cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
tq.tasks.Init()
|
|
||||||
tq.taskMap = make(map[string]*Task[T])
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tq *TaskQueue[T]) CleanupCancelled() int {
|
|
||||||
tq.mu.Lock()
|
|
||||||
defer tq.mu.Unlock()
|
|
||||||
|
|
||||||
removed := 0
|
|
||||||
element := tq.tasks.Front()
|
|
||||||
|
|
||||||
for element != nil {
|
|
||||||
next := element.Next()
|
|
||||||
task := element.Value.(*Task[T])
|
|
||||||
|
|
||||||
if task.IsCancelled() {
|
|
||||||
tq.tasks.Remove(element)
|
|
||||||
delete(tq.taskMap, task.ID)
|
|
||||||
removed++
|
|
||||||
}
|
|
||||||
|
|
||||||
element = next
|
|
||||||
}
|
|
||||||
|
|
||||||
return removed
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ import (
|
|||||||
|
|
||||||
// helper to create a simple Task with integer payload
|
// helper to create a simple Task with integer payload
|
||||||
func newTask(id string) *queue.Task[int] {
|
func newTask(id string) *queue.Task[int] {
|
||||||
return queue.NewTask(context.Background(), id, 0)
|
return queue.NewTask(context.Background(), id, "testing", 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAddAndLength(t *testing.T) {
|
func TestAddAndLength(t *testing.T) {
|
||||||
@@ -39,37 +39,6 @@ func TestDuplicateAdd(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetAndPeek(t *testing.T) {
|
|
||||||
q := queue.NewTaskQueue[int]()
|
|
||||||
t1 := newTask("a")
|
|
||||||
t2 := newTask("b")
|
|
||||||
q.Add(t1)
|
|
||||||
q.Add(t2)
|
|
||||||
// Peek should return t1
|
|
||||||
peeked, err := q.Peek()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error on Peek: %v", err)
|
|
||||||
}
|
|
||||||
if peeked.ID != "a" {
|
|
||||||
t.Fatalf("expected Peek ID 'a', got '%s'", peeked.ID)
|
|
||||||
}
|
|
||||||
// Get should return t1 then t2
|
|
||||||
first, err := q.Get()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error on Get: %v", err)
|
|
||||||
}
|
|
||||||
if first.ID != "a" {
|
|
||||||
t.Fatalf("expected first Get ID 'a', got '%s'", first.ID)
|
|
||||||
}
|
|
||||||
second, err := q.Get()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error on second Get: %v", err)
|
|
||||||
}
|
|
||||||
if second.ID != "b" {
|
|
||||||
t.Fatalf("expected second Get ID 'b', got '%s'", second.ID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCancelAndActiveLength(t *testing.T) {
|
func TestCancelAndActiveLength(t *testing.T) {
|
||||||
q := queue.NewTaskQueue[int]()
|
q := queue.NewTaskQueue[int]()
|
||||||
t1 := newTask("1")
|
t1 := newTask("1")
|
||||||
@@ -90,41 +59,6 @@ func TestCancelAndActiveLength(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRemoveTask(t *testing.T) {
|
|
||||||
q := queue.NewTaskQueue[int]()
|
|
||||||
t1 := newTask("r1")
|
|
||||||
q.Add(t1)
|
|
||||||
if err := q.RemoveTask("r1"); err != nil {
|
|
||||||
t.Fatalf("unexpected error on RemoveTask: %v", err)
|
|
||||||
}
|
|
||||||
if q.Length() != 0 {
|
|
||||||
t.Fatalf("expected length 0 after remove, got %d", q.Length())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestClearAndCleanupCancelled(t *testing.T) {
|
|
||||||
q := queue.NewTaskQueue[int]()
|
|
||||||
tasks := []*queue.Task[int]{newTask("c1"), newTask("c2"), newTask("c3")}
|
|
||||||
for _, tsk := range tasks {
|
|
||||||
q.Add(tsk)
|
|
||||||
}
|
|
||||||
// Cancel one
|
|
||||||
q.CancelTask("c2")
|
|
||||||
// Cleanup cancelled
|
|
||||||
removed := q.CleanupCancelled()
|
|
||||||
if removed != 1 {
|
|
||||||
t.Fatalf("expected removed 1, got %d", removed)
|
|
||||||
}
|
|
||||||
if q.ActiveLength() != 2 {
|
|
||||||
t.Fatalf("expected active length 2 after cleanup, got %d", q.ActiveLength())
|
|
||||||
}
|
|
||||||
// Clear all
|
|
||||||
q.Clear()
|
|
||||||
if q.Length() != 0 {
|
|
||||||
t.Fatalf("expected length 0 after clear, got %d", q.Length())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCloseBehavior(t *testing.T) {
|
func TestCloseBehavior(t *testing.T) {
|
||||||
q := queue.NewTaskQueue[int]()
|
q := queue.NewTaskQueue[int]()
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
type Task[T any] struct {
|
type Task[T any] struct {
|
||||||
ID string
|
ID string
|
||||||
|
Title string
|
||||||
Data T
|
Data T
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
@@ -15,10 +16,19 @@ type Task[T any] struct {
|
|||||||
element *list.Element
|
element *list.Element
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTask[T any](ctx context.Context, id string, data T) *Task[T] {
|
// Read-only info about a task
|
||||||
|
type TaskInfo struct {
|
||||||
|
ID string
|
||||||
|
Created time.Time
|
||||||
|
Cancelled bool
|
||||||
|
Title string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTask[T any](ctx context.Context, id string, title string, data T) *Task[T] {
|
||||||
cancelCtx, cancel := context.WithCancel(ctx)
|
cancelCtx, cancel := context.WithCancel(ctx)
|
||||||
return &Task[T]{
|
return &Task[T]{
|
||||||
ID: id,
|
ID: id,
|
||||||
|
Title: title,
|
||||||
Data: data,
|
Data: data,
|
||||||
ctx: cancelCtx,
|
ctx: cancelCtx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
@@ -26,7 +36,7 @@ func NewTask[T any](ctx context.Context, id string, data T) *Task[T] {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Task[T]) IsCancelled() bool {
|
func (t *Task[T]) Cancelled() bool {
|
||||||
select {
|
select {
|
||||||
case <-t.ctx.Done():
|
case <-t.ctx.Done():
|
||||||
return true
|
return true
|
||||||
|
|||||||
147
storage/telegram/split.go
Normal file
147
storage/telegram/split.go
Normal file
@@ -0,0 +1,147 @@
|
|||||||
|
package telegram
|
||||||
|
|
||||||
|
import (
|
||||||
|
"archive/zip"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type splitWriter struct {
|
||||||
|
baseName string
|
||||||
|
partSize int64
|
||||||
|
currentPart int
|
||||||
|
currentSize int64
|
||||||
|
currentFile *os.File
|
||||||
|
totalParts int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSplitWriter(baseName string, partSize int64) *splitWriter {
|
||||||
|
return &splitWriter{
|
||||||
|
baseName: baseName,
|
||||||
|
partSize: partSize,
|
||||||
|
currentPart: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write implements io.Writer interface
|
||||||
|
func (w *splitWriter) Write(p []byte) (n int, err error) {
|
||||||
|
written := 0
|
||||||
|
for written < len(p) {
|
||||||
|
if w.currentFile == nil || w.currentSize >= w.partSize {
|
||||||
|
if err := w.nextPart(); err != nil {
|
||||||
|
return written, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
toWrite := int64(len(p) - written)
|
||||||
|
remaining := w.partSize - w.currentSize
|
||||||
|
if toWrite > remaining {
|
||||||
|
toWrite = remaining
|
||||||
|
}
|
||||||
|
|
||||||
|
nw, err := w.currentFile.Write(p[written : written+int(toWrite)])
|
||||||
|
written += nw
|
||||||
|
w.currentSize += int64(nw)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return written, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return written, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *splitWriter) Close() error {
|
||||||
|
if w.currentFile != nil {
|
||||||
|
return w.currentFile.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *splitWriter) nextPart() error {
|
||||||
|
if w.currentFile != nil {
|
||||||
|
if err := w.currentFile.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
partName := w.partName(w.currentPart)
|
||||||
|
file, err := os.Create(partName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
w.currentFile = file
|
||||||
|
w.currentSize = 0
|
||||||
|
w.currentPart++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *splitWriter) partName(partNum int) string {
|
||||||
|
// file.zip.001, file.zip.002, ...
|
||||||
|
return fmt.Sprintf("%s.zip.%03d", w.baseName, partNum+1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *splitWriter) finalize() error {
|
||||||
|
w.totalParts = w.currentPart
|
||||||
|
|
||||||
|
// 如果只有一个分卷,直接重命名为 .zip
|
||||||
|
if w.totalParts == 1 {
|
||||||
|
oldName := fmt.Sprintf("%s.zip.001", w.baseName)
|
||||||
|
newName := fmt.Sprintf("%s.zip", w.baseName)
|
||||||
|
return os.Rename(oldName, newName)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func CreateSplitZip(ctx context.Context, reader io.Reader, size int64, fileName, outputBase string, partSize int64) error {
|
||||||
|
// seek the reader if possible
|
||||||
|
if rs, ok := reader.(io.ReadSeeker); ok {
|
||||||
|
if _, err := rs.Seek(0, io.SeekStart); err != nil {
|
||||||
|
return fmt.Errorf("failed to seek reader: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
outputDir := filepath.Dir(outputBase)
|
||||||
|
if err := os.MkdirAll(outputDir, os.ModePerm); err != nil {
|
||||||
|
return fmt.Errorf("failed to create output directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
splitWriter := newSplitWriter(outputBase, partSize)
|
||||||
|
defer splitWriter.Close()
|
||||||
|
|
||||||
|
zipWriter := zip.NewWriter(splitWriter)
|
||||||
|
defer zipWriter.Close()
|
||||||
|
|
||||||
|
header := &zip.FileHeader{
|
||||||
|
Name: fileName,
|
||||||
|
Method: zip.Store, // just store without compression
|
||||||
|
Modified: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
writer, err := zipWriter.CreateHeader(header)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create zip header: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
copied, err := io.Copy(writer, reader)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to write data: %w", err)
|
||||||
|
}
|
||||||
|
if copied != size {
|
||||||
|
return fmt.Errorf("incomplete write: expected %d bytes, got %d bytes", size, copied)
|
||||||
|
}
|
||||||
|
if err := zipWriter.Close(); err != nil {
|
||||||
|
return fmt.Errorf("failed to close zip writer: %w", err)
|
||||||
|
}
|
||||||
|
if err := splitWriter.Close(); err != nil {
|
||||||
|
return fmt.Errorf("failed to close split writer: %w", err)
|
||||||
|
}
|
||||||
|
if err := splitWriter.finalize(); err != nil {
|
||||||
|
return fmt.Errorf("failed to rename split files: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
55
storage/telegram/split_test.go
Normal file
55
storage/telegram/split_test.go
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
package telegram
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCreateSplitZip(t *testing.T) {
|
||||||
|
input := "tests/testfile.dat"
|
||||||
|
file, err := os.Open(input)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to open test file: %v", err)
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
fileName := filepath.Base(input)
|
||||||
|
fileInfo, err := file.Stat()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to stat test file: %v", err)
|
||||||
|
}
|
||||||
|
fileSize := fileInfo.Size()
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
partSize int64
|
||||||
|
output string
|
||||||
|
}{
|
||||||
|
{partSize: int64(1024 * 1024 * 500), output: "tests/split_test_output_500MB"},
|
||||||
|
{partSize: int64(1024 * 1024 * 100), output: "tests/split_test_output_100MB"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
err = CreateSplitZip(t.Context(), file, fileSize, fileName, tt.output, tt.partSize)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("CreateSplitZip failed: %v", err)
|
||||||
|
}
|
||||||
|
matched, err := filepath.Glob(tt.output + ".z*")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to glob split files: %v", err)
|
||||||
|
}
|
||||||
|
if len(matched) == 0 {
|
||||||
|
t.Fatalf("no split files found")
|
||||||
|
}
|
||||||
|
t.Logf("Created %d split files", len(matched))
|
||||||
|
for _, f := range matched {
|
||||||
|
info, err := os.Stat(f)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to stat file %s: %v", f, err)
|
||||||
|
}
|
||||||
|
if info.Size() > tt.partSize {
|
||||||
|
t.Errorf("file %s exceeds part size: %d > %d", f, info.Size(), tt.partSize)
|
||||||
|
}
|
||||||
|
t.Logf(" - %s (%d bytes)", f, info.Size())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,10 +4,13 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/celestix/gotgproto/ext"
|
||||||
"github.com/charmbracelet/log"
|
"github.com/charmbracelet/log"
|
||||||
"github.com/duke-git/lancet/v2/slice"
|
"github.com/duke-git/lancet/v2/slice"
|
||||||
"github.com/duke-git/lancet/v2/validator"
|
"github.com/duke-git/lancet/v2/validator"
|
||||||
@@ -16,6 +19,7 @@ import (
|
|||||||
"github.com/gotd/td/telegram/message/styling"
|
"github.com/gotd/td/telegram/message/styling"
|
||||||
"github.com/gotd/td/telegram/uploader"
|
"github.com/gotd/td/telegram/uploader"
|
||||||
"github.com/gotd/td/tg"
|
"github.com/gotd/td/tg"
|
||||||
|
"github.com/krau/SaveAny-Bot/common/utils/dlutil"
|
||||||
"github.com/krau/SaveAny-Bot/common/utils/tgutil"
|
"github.com/krau/SaveAny-Bot/common/utils/tgutil"
|
||||||
"github.com/krau/SaveAny-Bot/config"
|
"github.com/krau/SaveAny-Bot/config"
|
||||||
storconfig "github.com/krau/SaveAny-Bot/config/storage"
|
storconfig "github.com/krau/SaveAny-Bot/config/storage"
|
||||||
@@ -26,6 +30,12 @@ import (
|
|||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// https://core.telegram.org/api/config#upload-max-fileparts-default
|
||||||
|
DefaultSplitSize = 4000 * 524288 // 4000 * 512 KB
|
||||||
|
MaxUploadFileSize = 4000 * 524288 // 4000 * 512 KB
|
||||||
|
)
|
||||||
|
|
||||||
type Telegram struct {
|
type Telegram struct {
|
||||||
config storconfig.TelegramStorageConfig
|
config storconfig.TelegramStorageConfig
|
||||||
limiter *rate.Limiter
|
limiter *rate.Limiter
|
||||||
@@ -65,22 +75,39 @@ func (t *Telegram) Exists(ctx context.Context, storagePath string) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *Telegram) Save(ctx context.Context, r io.Reader, storagePath string) error {
|
func (t *Telegram) Save(ctx context.Context, r io.Reader, storagePath string) error {
|
||||||
if err := t.limiter.Wait(ctx); err != nil {
|
tctx := tgutil.ExtFromContext(ctx)
|
||||||
return fmt.Errorf("rate limit failed: %w", err)
|
if tctx == nil {
|
||||||
|
return fmt.Errorf("failed to get telegram context")
|
||||||
|
}
|
||||||
|
size := func() int64 {
|
||||||
|
if length := ctx.Value(ctxkey.ContentLength); length != nil {
|
||||||
|
if l, ok := length.(int64); ok {
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1 // unknown size
|
||||||
|
}()
|
||||||
|
if t.config.SkipLarge && size > MaxUploadFileSize {
|
||||||
|
log.FromContext(ctx).Warnf("Skipping file larger than Telegram limit (%d bytes): %d bytes", MaxUploadFileSize, size)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
rs, seekable := r.(io.ReadSeeker)
|
rs, seekable := r.(io.ReadSeeker)
|
||||||
if !seekable || rs == nil {
|
if !seekable || rs == nil {
|
||||||
return fmt.Errorf("reader must implement io.ReadSeeker")
|
return fmt.Errorf("reader must implement io.ReadSeeker")
|
||||||
}
|
}
|
||||||
tctx := tgutil.ExtFromContext(ctx)
|
splitSize := t.config.SplitSizeMB * 1024 * 1024
|
||||||
if tctx == nil {
|
if splitSize <= 0 {
|
||||||
return fmt.Errorf("failed to get telegram context")
|
splitSize = DefaultSplitSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := t.limiter.Wait(ctx); err != nil {
|
||||||
|
return fmt.Errorf("rate limit failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// 去除前导斜杠并分隔路径, 当 len(parts):
|
// 去除前导斜杠并分隔路径, 当 len(parts):
|
||||||
// ==0, 存储到配置文件中的 chat_id, 随机文件名
|
// ==0, 存储到配置文件中的 chat_id, 随机文件名
|
||||||
// ==1, 视作只有文件名, 存储到配置文件中的 chat_id
|
// ==1, 视作只有文件名, 存储到配置文件中的 chat_id
|
||||||
// ==2, parts[0]: 视作要存储到的 chat_id, parts[1]: filename
|
// ==2, parts[0]: 视作要存储到的 chat_id, parts[1]: filename
|
||||||
|
|
||||||
parts := slice.Compact(strings.Split(strings.TrimPrefix(storagePath, "/"), "/"))
|
parts := slice.Compact(strings.Split(strings.TrimPrefix(storagePath, "/"), "/"))
|
||||||
filename := ""
|
filename := ""
|
||||||
chatID := t.config.ChatID
|
chatID := t.config.ChatID
|
||||||
@@ -113,17 +140,13 @@ func (t *Telegram) Save(ctx context.Context, r io.Reader, storagePath string) er
|
|||||||
}
|
}
|
||||||
upler := uploader.NewUploader(tctx.Raw).
|
upler := uploader.NewUploader(tctx.Raw).
|
||||||
WithPartSize(tglimit.MaxUploadPartSize).
|
WithPartSize(tglimit.MaxUploadPartSize).
|
||||||
WithThreads(config.C().Threads)
|
WithThreads(dlutil.BestThreads(size, config.C().Threads))
|
||||||
|
if size > splitSize {
|
||||||
|
// large file, use split uploader
|
||||||
|
return t.splitUpload(tctx, rs, filename, upler, peer, size, splitSize)
|
||||||
|
}
|
||||||
|
|
||||||
var file tg.InputFileClass
|
var file tg.InputFileClass
|
||||||
size := func() int64 {
|
|
||||||
if length := ctx.Value(ctxkey.ContentLength); length != nil {
|
|
||||||
if l, ok := length.(int64); ok {
|
|
||||||
return l
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return -1 // unknown size
|
|
||||||
}()
|
|
||||||
if size < 0 {
|
if size < 0 {
|
||||||
file, err = upler.FromReader(ctx, filename, rs)
|
file, err = upler.FromReader(ctx, filename, rs)
|
||||||
} else {
|
} else {
|
||||||
@@ -186,3 +209,91 @@ func (t *Telegram) Save(ctx context.Context, r io.Reader, storagePath string) er
|
|||||||
func (t *Telegram) CannotStream() string {
|
func (t *Telegram) CannotStream() string {
|
||||||
return "Telegram storage must use a ReaderSeeker"
|
return "Telegram storage must use a ReaderSeeker"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *Telegram) splitUpload(ctx *ext.Context, rs io.ReadSeeker, filename string, upler *uploader.Uploader, peer tg.InputPeerClass, fileSize, splitSize int64) error {
|
||||||
|
tempId := xid.New().String()
|
||||||
|
outputBase := filepath.Join(config.C().Temp.BasePath, tempId, strings.Split(filename, ".")[0])
|
||||||
|
defer func() {
|
||||||
|
// cleanup temp files
|
||||||
|
if err := os.RemoveAll(filepath.Join(config.C().Temp.BasePath, tempId)); err != nil {
|
||||||
|
log.FromContext(ctx).Warnf("Failed to cleanup temp split files: %s", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
if err := CreateSplitZip(ctx, rs, fileSize, filename, outputBase, splitSize); err != nil {
|
||||||
|
return fmt.Errorf("failed to create split zip: %w", err)
|
||||||
|
}
|
||||||
|
matched, err := filepath.Glob(outputBase + ".z*")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to glob split files: %w", err)
|
||||||
|
}
|
||||||
|
inputFiles := make([]tg.InputFileClass, 0, len(matched))
|
||||||
|
for _, partPath := range matched {
|
||||||
|
// 串行上传, 不然容易被tg风控
|
||||||
|
err = func() error {
|
||||||
|
partFile, err := os.Open(partPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to open split part %s: %w", partPath, err)
|
||||||
|
}
|
||||||
|
defer partFile.Close()
|
||||||
|
partInfo, err := partFile.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to stat split part %s: %w", partPath, err)
|
||||||
|
}
|
||||||
|
partFileSize := partInfo.Size()
|
||||||
|
partName := filepath.Base(partPath)
|
||||||
|
partInputFile, err := upler.Upload(ctx, uploader.NewUpload(partName, partFile, partFileSize))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to upload split part %s: %w", partPath, err)
|
||||||
|
}
|
||||||
|
inputFiles = append(inputFiles, partInputFile)
|
||||||
|
return nil
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to upload split part %s: %w", partPath, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(inputFiles) == 1 {
|
||||||
|
// only one part, send as normal file
|
||||||
|
// shoud not happen as we already check fileSize > splitSize
|
||||||
|
doc := message.UploadedDocument(inputFiles[0]).
|
||||||
|
Filename(filepath.Base(matched[0])).
|
||||||
|
ForceFile(true).
|
||||||
|
MIME("application/zip")
|
||||||
|
_, err = ctx.Sender.
|
||||||
|
WithUploader(upler).
|
||||||
|
To(peer).
|
||||||
|
Media(ctx, doc)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
multiMedia := make([]message.MultiMediaOption, 0, len(inputFiles))
|
||||||
|
for i, inputFile := range inputFiles {
|
||||||
|
doc := message.UploadedDocument(inputFile).
|
||||||
|
Filename(filepath.Base(matched[i])).
|
||||||
|
MIME("application/zip")
|
||||||
|
multiMedia = append(multiMedia, doc)
|
||||||
|
}
|
||||||
|
|
||||||
|
sender := ctx.Sender
|
||||||
|
|
||||||
|
if len(multiMedia) <= 10 {
|
||||||
|
_, err = sender.WithUploader(upler).
|
||||||
|
To(peer).
|
||||||
|
Album(ctx, multiMedia[0], multiMedia[1:]...)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// more than 10 parts, send in batches, each batch up to 10 parts
|
||||||
|
for i := 0; i < len(multiMedia); i += 10 {
|
||||||
|
end := min(i+10, len(multiMedia))
|
||||||
|
batch := multiMedia[i:end]
|
||||||
|
_, err = sender.WithUploader(upler).
|
||||||
|
To(peer).
|
||||||
|
Album(ctx, batch[0], batch[1:]...)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to send album batch: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user