Compare commits

..

7 Commits

Author SHA1 Message Date
krau
c0b4580e34 fix: correct split size calculation in Save method 2025-12-16 20:52:17 +08:00
krau
280fd6ead8 fix: update DefaultSplitSize 2025-12-16 20:51:11 +08:00
krau
0ca3d97711 feat: add task command to client and Title method to Task for tasks queue managing, #157 2025-12-15 11:29:55 +08:00
krau
51198a1e3d fix: remove redundant cancellation in Done method 2025-12-15 11:15:17 +08:00
krau
651835c467 feat: refactor queue to remove unused methods and add comments 2025-12-15 10:49:40 +08:00
krau
45c978980c feat: add support for splitting large files into parts for Telegram storage, #156 2025-12-15 10:25:50 +08:00
krau
c21ff7e499 feat: add direct links download functionality
- Implemented a new task type for handling direct links downloads.
- Added command handler for downloading multiple links via /dl command.
- Introduced progress tracking for direct link downloads.
- Enhanced filename parsing to support various encoding scenarios.
- Updated enums to include direct links as a task type.
- Refactored existing task structures to accommodate new functionality.
- Improved error handling and logging throughout the download process.
2025-12-08 17:10:41 +08:00
33 changed files with 1400 additions and 255 deletions

View File

@@ -80,8 +80,10 @@ func handleAddCallback(ctx *ext.Context, update *ext.Update) error {
dirPath = path.Join(dirPath, fsutil.NormalizePathname(data.ParsedItem.Title))
}
shortcut.CreateAndAddParsedTaskWithEdit(ctx, selectedStorage, dirPath, data.ParsedItem, msgID, userID)
case tasktype.TaskTypeDirectlinks:
shortcut.CreateAndAddDirectTaskWithEdit(ctx, selectedStorage, dirPath, data.DirectLinks, msgID, userID)
default:
log.FromContext(ctx).Errorf("Unsupported task type: %s", data.TaskType)
return fmt.Errorf("unexcept task type: %s", data.TaskType)
}
return dispatcher.EndGroups
}

View File

@@ -26,3 +26,20 @@ func handleCancelCallback(ctx *ext.Context, update *ext.Update) error {
return dispatcher.EndGroups
}
func handleCancelCmd(ctx *ext.Context, update *ext.Update) error {
logger := log.FromContext(ctx)
args := strings.Fields(update.EffectiveMessage.Text)
if len(args) < 2 {
ctx.Reply(update, ext.ReplyTextString("用法: /cancel <task_id>"), nil)
return dispatcher.EndGroups
}
taskID := args[1]
if err := core.CancelTask(ctx, taskID); err != nil {
logger.Errorf("failed to cancel task %s: %v", taskID, err)
ctx.Reply(update, ext.ReplyTextString("取消任务失败: "+err.Error()), nil)
return dispatcher.EndGroups
}
ctx.Reply(update, ext.ReplyTextString("已请求取消任务: "+taskID), nil)
return dispatcher.EndGroups
}

49
client/bot/handlers/dl.go Normal file
View File

@@ -0,0 +1,49 @@
package handlers
import (
"fmt"
"net/url"
"strings"
"github.com/celestix/gotgproto/ext"
"github.com/charmbracelet/log"
"github.com/duke-git/lancet/v2/slice"
"github.com/krau/SaveAny-Bot/client/bot/handlers/utils/msgelem"
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
"github.com/krau/SaveAny-Bot/pkg/tcbdata"
"github.com/krau/SaveAny-Bot/storage"
)
func handleDlCmd(ctx *ext.Context, update *ext.Update) error {
logger := log.FromContext(ctx)
args := strings.Split(update.EffectiveMessage.Text, " ")
if len(args) < 2 {
ctx.Reply(update, ext.ReplyTextString("用法: /dl <链接1> <链接2> ..."), nil)
return nil
}
links := args[1:]
for i, link := range links {
links[i] = strings.TrimSpace(link)
u, err := url.Parse(link)
if err != nil || u.Scheme == "" || u.Host == "" {
logger.Warn("invaild link", link)
links[i] = ""
}
}
links = slice.Compact(links)
if len(links) == 0 {
ctx.Reply(update, ext.ReplyTextString("没有有效的链接可供下载"), nil)
return nil
}
markup, err := msgelem.BuildAddSelectStorageKeyboard(storage.GetUserStorages(ctx, update.GetUserChat().GetID()), tcbdata.Add{
TaskType: tasktype.TaskTypeDirectlinks,
DirectLinks: links,
})
if err != nil {
return err
}
ctx.Reply(update, ext.ReplyTextString(fmt.Sprintf("共 %d 个文件, 请选择存储位置", len(links))), &ext.ReplyOpts{
Markup: markup,
})
return nil
}

View File

@@ -26,15 +26,18 @@ var CommandHandlers = []DescCommandHandler{
{"storage", "设置默认存储端", handleStorageCmd},
{"dir", "管理存储文件夹", handleDirCmd},
{"rule", "管理自动存储规则", handleRuleCmd},
{"save", "保存文件", handleSilentMode(handleSaveCmd, handleSilentSaveReplied)},
{"dl", "下载给定链接的文件", handleDlCmd},
{"task", "管理任务队列", handleTaskCmd},
{"cancel", "取消任务", handleCancelCmd},
{"watch", "监听聊天(UserBot)", handleWatchCmd},
{"unwatch", "取消监听聊天(UserBot)", handleUnwatchCmd},
{"lswatch", "列出监听的聊天(UserBot)", handleLswatchCmd},
{"save", "保存文件", handleSilentMode(handleSaveCmd, handleSilentSaveReplied)},
{"config", "修改配置", handleConfigCmd},
{"fnametmpl", "设置文件命名模板", handleConfigFnameTmpl},
{"update", "检查更新", handleUpdateCmd},
{"help", "显示帮助", handleHelpCmd},
{"parser", "管理解析器", handleParserCmd},
{"update", "检查更新", handleUpdateCmd},
}
func Register(disp dispatcher.Dispatcher) {

View File

@@ -26,7 +26,7 @@ import (
func handleSaveCmd(ctx *ext.Context, update *ext.Update) error {
logger := log.FromContext(ctx)
args := strings.Split(string(update.EffectiveMessage.Text), " ")
args := strings.Split(update.EffectiveMessage.Text, " ")
if len(args) >= 3 {
return handleBatchSave(ctx, update, args[1:])
}
@@ -35,17 +35,6 @@ func handleSaveCmd(ctx *ext.Context, update *ext.Update) error {
ctx.Reply(update, ext.ReplyTextString(i18n.T(i18nk.BotMsgSaveHelpText)), nil)
return dispatcher.EndGroups
}
// genFilename := func() string {
// if len(args) > 1 {
// return args[1]
// }
// filename := tgutil.GenFileNameFromMessage(*replyTo.Message)
// return filename
// }()
// option := tfile.WithNameIfEmpty(genFilename)
// if len(args) > 1 {
// option = tfile.WithName(genFilename)
// }
userDB, err := database.GetUserByChatID(ctx, update.GetUserChat().GetID())
if err != nil {
return err

View File

@@ -0,0 +1,113 @@
package handlers
import (
"fmt"
"strings"
"time"
"github.com/celestix/gotgproto/dispatcher"
"github.com/celestix/gotgproto/ext"
"github.com/charmbracelet/log"
"github.com/gotd/td/telegram/message/styling"
"github.com/krau/SaveAny-Bot/core"
)
func handleTaskCmd(ctx *ext.Context, update *ext.Update) error {
logger := log.FromContext(ctx)
args := strings.Fields(update.EffectiveMessage.Text)
if len(args) == 1 {
showRunningTasks(ctx, update)
return dispatcher.EndGroups
}
switch args[1] {
case "running", "run", "r":
showRunningTasks(ctx, update)
case "queued", "queue", "q", "waiting":
showQueuedTasks(ctx, update)
case "cancel", "c":
if len(args) < 3 {
ctx.Reply(update, ext.ReplyTextString("用法: /tasks cancel <task_id>"), nil)
return dispatcher.EndGroups
}
taskID := args[2]
if err := core.CancelTask(ctx, taskID); err != nil {
logger.Errorf("取消任务 %s 失败: %v", taskID, err)
ctx.Reply(update, ext.ReplyTextString("取消任务失败: "+err.Error()), nil)
return dispatcher.EndGroups
}
ctx.Reply(update, ext.ReplyTextStyledTextArray([]styling.StyledTextOption{
styling.Plain("已请求取消任务: "),
styling.Code(taskID),
}), nil)
default:
ctx.Reply(update, ext.ReplyTextString("用法: /tasks [running|queued|cancel <task_id>]"), nil)
}
return dispatcher.EndGroups
}
func showRunningTasks(ctx *ext.Context, update *ext.Update) {
tasks := core.GetRunningTasks(ctx)
if len(tasks) == 0 {
ctx.Reply(update, ext.ReplyTextString("当前没有正在运行的任务"), nil)
return
}
opts := make([]styling.StyledTextOption, 0, 2+len(tasks)*4)
opts = append(opts,
styling.Bold("当前正在运行的任务:"),
styling.Plain(fmt.Sprintf("\n总数: %d\n", len(tasks))),
)
for _, t := range tasks {
created := t.Created.In(time.Local).Format("2006-01-02 15:04:05")
status := "运行中"
if t.Cancelled {
status = "已请求取消"
}
opts = append(opts,
styling.Plain("\nID: "),
styling.Code(t.ID),
styling.Plain("\n名称: "),
styling.Code(t.Title),
styling.Plain("\n创建时间: "),
styling.Code(created),
styling.Plain("\n状态: "),
styling.Code(status),
)
}
ctx.Reply(update, ext.ReplyTextStyledTextArray(opts), nil)
}
func showQueuedTasks(ctx *ext.Context, update *ext.Update) {
tasks := core.GetQueuedTasks(ctx)
if len(tasks) == 0 {
ctx.Reply(update, ext.ReplyTextString("当前没有排队中的任务"), nil)
return
}
opts := make([]styling.StyledTextOption, 0, 2+len(tasks)*3)
opts = append(opts,
styling.Bold("当前排队中的任务:"),
styling.Plain(fmt.Sprintf("\n总数: %d\n", len(tasks))),
)
for _, t := range tasks {
created := t.Created.In(time.Local).Format("2006-01-02 15:04:05")
status := "排队中"
if t.Cancelled {
status = "已请求取消"
}
opts = append(opts,
styling.Plain("\nID: "),
styling.Code(t.ID),
styling.Plain("\n名称: "),
styling.Code(t.Title),
styling.Plain("\n创建时间: "),
styling.Code(created),
styling.Plain("\n状态: "),
styling.Code(status),
)
if len(tasks) > 10 {
opts = append(opts, styling.Plain("\n...\n只显示前 10 个任务, 共 "+fmt.Sprintf("%d", len(tasks))+" 个任务"))
break
}
}
ctx.Reply(update, ext.ReplyTextStyledTextArray(opts), nil)
}

View File

@@ -45,6 +45,8 @@ func BuildAddSelectStorageKeyboard(stors []storage.Storage, adddata tcbdata.Add)
TphDirPath: adddata.TphDirPath,
ParsedItem: adddata.ParsedItem,
DirectLinks: adddata.DirectLinks,
}
dataid := xid.New().String()
err := cache.Set(dataid, data)

View File

@@ -0,0 +1,30 @@
package shortcut
import (
"github.com/celestix/gotgproto/dispatcher"
"github.com/celestix/gotgproto/ext"
"github.com/charmbracelet/log"
"github.com/gotd/td/tg"
"github.com/krau/SaveAny-Bot/common/utils/tgutil"
"github.com/krau/SaveAny-Bot/core"
"github.com/krau/SaveAny-Bot/core/tasks/directlinks"
"github.com/krau/SaveAny-Bot/storage"
"github.com/rs/xid"
)
func CreateAndAddDirectTaskWithEdit(ctx *ext.Context, stor storage.Storage, dirPath string, links []string, msgID int, userID int64) error {
injectCtx := tgutil.ExtWithContext(ctx.Context, ctx)
task := directlinks.NewTask(xid.New().String(), injectCtx, links, stor, stor.JoinStoragePath(dirPath), directlinks.NewProgress(msgID, userID))
if err := core.AddTask(injectCtx, task); err != nil {
log.FromContext(ctx).Errorf("Failed to add task: %s", err)
ctx.EditMessage(userID, &tg.MessagesEditMessageRequest{
ID: msgID,
Message: "任务添加失败: " + err.Error(),
})
return dispatcher.EndGroups
}
ctx.EditMessage(userID, &tg.MessagesEditMessageRequest{
Message: "任务已添加",
})
return dispatcher.EndGroups
}

View File

@@ -1,6 +1,8 @@
package ioutil
import "io"
import (
"io"
)
type ProgressWriterAt struct {
wrAt io.WriterAt
@@ -46,4 +48,4 @@ func NewProgressWriter(
wr: wr,
onWrite: onWrite,
}
}
}

View File

@@ -12,6 +12,11 @@ type TelegramStorageConfig struct {
ForceFile bool `toml:"force_file" mapstructure:"force_file" json:"force_file"`
RateLimit int `toml:"rate_limit" mapstructure:"rate_limit" json:"rate_limit"`
RateBurst int `toml:"rate_burst" mapstructure:"rate_burst" json:"rate_burst"`
SkipLarge bool `toml:"skip_large" mapstructure:"skip_large" json:"skip_large"` // skip files larger than Telegram limit(2GB)
// split files larger than Telegram limit(2GB) into parts of specified size, in MB, leave 0 to set default(2000MB)
// only effective when SkipLarge is false
// use zip when splitting
SplitSizeMB int64 `toml:"split_size_mb" mapstructure:"split_size_mb" json:"split_size_mb"`
}
func (m *TelegramStorageConfig) Validate() error {

View File

@@ -10,15 +10,16 @@ import (
"github.com/krau/SaveAny-Bot/pkg/queue"
)
var queueInstance *queue.TaskQueue[Exectable]
var queueInstance *queue.TaskQueue[Executable]
type Exectable interface {
type Executable interface {
Type() tasktype.TaskType
Title() string
TaskID() string
Execute(ctx context.Context) error
}
func worker(ctx context.Context, qe *queue.TaskQueue[Exectable], semaphore chan struct{}) {
func worker(ctx context.Context, qe *queue.TaskQueue[Executable], semaphore chan struct{}) {
logger := log.FromContext(ctx)
execHooks := config.C().Hook.Exec
for {
@@ -28,27 +29,27 @@ func worker(ctx context.Context, qe *queue.TaskQueue[Exectable], semaphore chan
logger.Error("Failed to get task from queue:", err)
break // queue closed and empty
}
task := qtask.Data
logger.Infof("Processing task: %s", task.TaskID())
exe := qtask.Data
logger.Infof("Processing task: %s", exe.TaskID())
if err := ExecCommandString(qtask.Context(), execHooks.TaskBeforeStart); err != nil {
logger.Errorf("Failed to execute before start hook for task %s: %v", task.TaskID(), err)
logger.Errorf("Failed to execute before start hook for task %s: %v", exe.TaskID(), err)
}
if err := task.Execute(qtask.Context()); err != nil {
if err := exe.Execute(qtask.Context()); err != nil {
if errors.Is(err, context.Canceled) {
logger.Infof("Task %s was canceled", task.TaskID())
logger.Infof("Task %s was canceled", exe.TaskID())
if err := ExecCommandString(ctx, execHooks.TaskCancel); err != nil {
logger.Errorf("Failed to execute cancel hook for task %s: %v", task.TaskID(), err)
logger.Errorf("Failed to execute cancel hook for task %s: %v", exe.TaskID(), err)
}
} else {
logger.Errorf("Failed to execute task %s: %v", task.TaskID(), err)
logger.Errorf("Failed to execute task %s: %v", exe.TaskID(), err)
if err := ExecCommandString(ctx, execHooks.TaskFail); err != nil {
logger.Errorf("Failed to execute fail hook for task %s: %v", task.TaskID(), err)
logger.Errorf("Failed to execute fail hook for task %s: %v", exe.TaskID(), err)
}
}
} else {
logger.Infof("Task %s completed successfully", task.TaskID())
logger.Infof("Task %s completed successfully", exe.TaskID())
if err := ExecCommandString(ctx, execHooks.TaskSuccess); err != nil {
logger.Errorf("Failed to execute success hook for task %s: %v", task.TaskID(), err)
logger.Errorf("Failed to execute success hook for task %s: %v", exe.TaskID(), err)
}
}
qe.Done(qtask.ID)
@@ -60,7 +61,7 @@ func Run(ctx context.Context) {
log.FromContext(ctx).Info("Start processing tasks...")
semaphore := make(chan struct{}, config.C().Workers)
if queueInstance == nil {
queueInstance = queue.NewTaskQueue[Exectable]()
queueInstance = queue.NewTaskQueue[Executable]()
}
for range config.C().Workers {
go worker(ctx, queueInstance, semaphore)
@@ -68,8 +69,8 @@ func Run(ctx context.Context) {
}
func AddTask(ctx context.Context, task Exectable) error {
return queueInstance.Add(queue.NewTask(ctx, task.TaskID(), task))
func AddTask(ctx context.Context, task Executable) error {
return queueInstance.Add(queue.NewTask(ctx, task.TaskID(), task.Title(), task))
}
func CancelTask(ctx context.Context, id string) error {
@@ -78,8 +79,13 @@ func CancelTask(ctx context.Context, id string) error {
}
func GetLength(ctx context.Context) int {
if queueInstance == nil {
return 0
}
return queueInstance.ActiveLength()
}
func GetRunningTasks(ctx context.Context) []queue.TaskInfo {
return queueInstance.RunningTasks()
}
func GetQueuedTasks(ctx context.Context) []queue.TaskInfo {
return queueInstance.QueuedTasks()
}

View File

@@ -24,7 +24,7 @@ func (t *Task) Execute(ctx context.Context) error {
workers := config.C().Workers
eg, gctx := errgroup.WithContext(ctx)
eg.SetLimit(workers)
for _, elem := range t.Elems {
for _, elem := range t.elems {
eg.Go(func() error {
t.processingMu.RLock()
if t.processing[elem.ID] != nil {

View File

@@ -8,12 +8,15 @@ import (
"sync/atomic"
"github.com/krau/SaveAny-Bot/config"
"github.com/krau/SaveAny-Bot/core"
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
"github.com/krau/SaveAny-Bot/pkg/tfile"
"github.com/krau/SaveAny-Bot/storage"
"github.com/rs/xid"
)
var _ core.Executable = (*Task)(nil)
type TaskElement struct {
ID string
Storage storage.Storage
@@ -25,8 +28,8 @@ type TaskElement struct {
type Task struct {
ID string
Ctx context.Context
Elems []TaskElement
ctx context.Context
elems []TaskElement
Progress ProgressTracker
IgnoreErrors bool // if true, errors during processing will be ignored
downloaded atomic.Int64
@@ -36,6 +39,11 @@ type Task struct {
failed map[string]error // [TODO] errors for each element
}
// Title implements core.Exectable.
func (t *Task) Title() string {
return fmt.Sprintf("[%s](%d files/%.2fMB)", t.Type(), len(t.elems), float64(t.totalSize)/(1024*1024))
}
func (t *Task) Type() tasktype.TaskType {
return tasktype.TaskTypeTgfiles
}
@@ -78,8 +86,8 @@ func NewBatchTGFileTask(
) *Task {
task := &Task{
ID: id,
Ctx: ctx,
Elems: files,
ctx: ctx,
elems: files,
Progress: progress,
downloaded: atomic.Int64{},
totalSize: func() int64 {

View File

@@ -44,11 +44,11 @@ func (t *Task) Downloaded() int64 {
}
func (t *Task) Count() int {
return len(t.Elems)
return len(t.elems)
}
func (t *Task) Processing() []TaskElementInfo {
processing := make([]TaskElementInfo, 0, len(t.Elems))
processing := make([]TaskElementInfo, 0, len(t.elems))
for _, elem := range t.processing {
processing = append(processing, elem)
}

View File

@@ -0,0 +1,167 @@
package directlinks
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"path/filepath"
"sync/atomic"
"github.com/charmbracelet/log"
"github.com/duke-git/lancet/v2/retry"
"github.com/krau/SaveAny-Bot/common/utils/fsutil"
"github.com/krau/SaveAny-Bot/common/utils/ioutil"
"github.com/krau/SaveAny-Bot/config"
"github.com/krau/SaveAny-Bot/pkg/enums/ctxkey"
"golang.org/x/sync/errgroup"
)
func (t *Task) Execute(ctx context.Context) error {
logger := log.FromContext(ctx)
logger.Infof("Starting directlinks task %s", t.ID)
if t.Progress != nil {
t.Progress.OnStart(ctx, t)
}
// head all links to get file info
eg, gctx := errgroup.WithContext(ctx)
eg.SetLimit(config.C().Workers)
fetchedTotalBytes := atomic.Int64{}
for _, file := range t.files {
eg.Go(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodHead, file.URL, nil)
if err != nil {
return fmt.Errorf("failed to create HEAD request for %s: %w", file.URL, err)
}
resp, err := t.client.Do(req)
if err != nil {
return fmt.Errorf("failed to HEAD %s: %w", file.URL, err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("HEAD %s returned status %d", file.URL, resp.StatusCode)
}
fetchedTotalBytes.Add(resp.ContentLength)
file.Size = resp.ContentLength
if name := resp.Header.Get("Content-Disposition"); name != "" {
// Set file name
filename := parseFilename(name)
file.Name = filename
}
return nil
})
}
err := eg.Wait()
if err != nil {
logger.Errorf("Error during HEAD requests: %v", err)
if t.Progress != nil {
t.Progress.OnDone(ctx, t, err)
}
return err
}
t.totalBytes = fetchedTotalBytes.Load()
// start downloading
eg, gctx = errgroup.WithContext(ctx)
eg.SetLimit(config.C().Workers)
for _, file := range t.files {
eg.Go(func() error {
t.processingMu.RLock()
if _, ok := t.processing[file.URL]; ok {
return fmt.Errorf("file %s is already being processed", file.URL)
}
t.processingMu.RUnlock()
t.processingMu.Lock()
t.processing[file.URL] = file
t.processingMu.Unlock()
defer func() {
t.processingMu.Lock()
delete(t.processing, file.URL)
t.processingMu.Unlock()
}()
err := t.processLink(gctx, file)
t.downloaded.Add(1)
if errors.Is(err, context.Canceled) {
logger.Debug("Link processing canceled")
return err
}
if err != nil {
logger.Errorf("Error processing link %s: %v", file.URL, err)
return fmt.Errorf("failed to process link %s: %w", file.URL, err)
}
return nil
})
}
err = eg.Wait()
if err != nil {
logger.Errorf("Error during directlinks task execution: %v", err)
} else {
logger.Infof("Directlinks task %s completed successfully", t.ID)
}
if t.Progress != nil {
t.Progress.OnDone(ctx, t, err)
}
return err
}
func (t *Task) processLink(ctx context.Context, file *File) error {
logger := log.FromContext(ctx)
err := retry.Retry(func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, file.URL, nil)
if err != nil {
return fmt.Errorf("failed to create GET request for %s: %w", file.URL, err)
}
resp, err := t.client.Do(req)
if err != nil {
return fmt.Errorf("failed to GET %s: %w", file.URL, err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("GET %s returned status %d", file.URL, resp.StatusCode)
}
ctx = context.WithValue(ctx, ctxkey.ContentLength, file.Size)
if t.stream {
return t.Storage.Save(ctx, resp.Body, filepath.Join(t.StorPath, file.Name))
}
cacheFile, err := fsutil.CreateFile(filepath.Join(config.C().Temp.BasePath,
fmt.Sprintf("direct_%s_%s", t.ID, file.Name)))
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
defer func() {
if err := cacheFile.CloseAndRemove(); err != nil {
logger.Errorf("Failed to close and remove cache file: %v", err)
}
}()
wr := ioutil.NewProgressWriter(cacheFile, func(n int) {
t.downloadedBytes.Add(int64(n))
if t.Progress != nil {
t.Progress.OnProgress(ctx, t)
}
})
copyResultCh := make(chan error, 1)
go func() {
_, err := io.Copy(wr, resp.Body)
copyResultCh <- err
}()
select {
case err := <-copyResultCh:
if err != nil {
return fmt.Errorf("failed to copy file %s to cache file: %w", file.URL, err)
}
case <-ctx.Done():
return ctx.Err()
}
_, err = cacheFile.Seek(0, 0)
if err != nil {
return fmt.Errorf("failed to seek cache file for resource %s: %w", file.URL, err)
}
return t.Storage.Save(ctx, cacheFile, filepath.Join(t.StorPath, file.Name))
}, retry.RetryTimes(uint(config.C().Retry)), retry.Context(ctx))
if ctx.Err() != nil {
return ctx.Err()
}
return err
}

View File

@@ -0,0 +1,196 @@
package directlinks
import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/charmbracelet/log"
"github.com/duke-git/lancet/v2/slice"
"github.com/gotd/td/telegram/message/entity"
"github.com/gotd/td/telegram/message/styling"
"github.com/gotd/td/tg"
"github.com/krau/SaveAny-Bot/common/utils/dlutil"
"github.com/krau/SaveAny-Bot/common/utils/tgutil"
)
type TaskInfo interface {
TotalBytes() int64
TotalFiles() int
TaskID() string
StorageName() string
StoragePath() string
DownloadedBytes() int64
Processing() []FileInfo
}
type FileInfo interface {
FileName() string
FileSize() int64
}
type ProgressTracker interface {
OnStart(ctx context.Context, info TaskInfo)
OnProgress(ctx context.Context, info TaskInfo)
OnDone(ctx context.Context, info TaskInfo, err error)
}
type Progress struct {
msgID int
chatID int64
start time.Time
lastUpdatePercent atomic.Int32
}
// OnDone implements ProgressTracker.
func (p *Progress) OnDone(ctx context.Context, info TaskInfo, err error) {
logger := log.FromContext(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
logger.Infof("Parsed task %s was canceled", info.TaskID())
ext := tgutil.ExtFromContext(ctx)
if ext != nil {
ext.EditMessage(p.chatID, &tg.MessagesEditMessageRequest{
ID: p.msgID,
Message: fmt.Sprintf("处理已取消: %s", info.TaskID()),
})
}
} else {
logger.Errorf("Parsed task %s failed: %s", info.TaskID(), err)
ext := tgutil.ExtFromContext(ctx)
if ext != nil {
ext.EditMessage(p.chatID, &tg.MessagesEditMessageRequest{
ID: p.msgID,
Message: fmt.Sprintf("处理失败: %s", err.Error()),
})
}
}
return
}
logger.Infof("Parsed task %s completed successfully", info.TaskID())
entityBuilder := entity.Builder{}
if err := styling.Perform(&entityBuilder,
styling.Plain("处理完成, 文件数量: "),
styling.Code(fmt.Sprintf("%d", info.TotalFiles())),
styling.Plain("\n保存路径: "),
styling.Code(fmt.Sprintf("[%s]:%s", info.StorageName(), info.StoragePath())),
); err != nil {
logger.Errorf("Failed to build entities: %s", err)
return
}
text, entities := entityBuilder.Complete()
req := &tg.MessagesEditMessageRequest{
ID: p.msgID,
}
req.SetMessage(text)
req.SetEntities(entities)
ext := tgutil.ExtFromContext(ctx)
if ext != nil {
ext.EditMessage(p.chatID, req)
}
}
// OnProgress implements ProgressTracker.
func (p *Progress) OnProgress(ctx context.Context, info TaskInfo) {
if !shouldUpdateProgress(info.TotalBytes(), info.DownloadedBytes(), int(p.lastUpdatePercent.Load())) {
return
}
percent := int((info.DownloadedBytes() * 100) / info.TotalBytes())
if p.lastUpdatePercent.Load() == int32(percent) {
return
}
p.lastUpdatePercent.Store(int32(percent))
log.FromContext(ctx).Debugf("Progress update: %s, %d/%d", info.TaskID(), info.DownloadedBytes(), info.TotalBytes())
entityBuilder := entity.Builder{}
var entities []tg.MessageEntityClass
if err := styling.Perform(&entityBuilder,
styling.Plain("正在下载\n总大小: "),
styling.Code(fmt.Sprintf("%.2f MB (%d个文件)", float64(info.TotalBytes())/(1024*1024), info.TotalFiles())),
styling.Plain("\n正在处理:\n"),
func() styling.StyledTextOption {
var lines []string
for _, elem := range info.Processing() {
lines = append(lines, fmt.Sprintf(" - %s (%.2f MB)", elem.FileName(), float64(elem.FileSize())/(1024*1024)))
}
if len(lines) == 0 {
lines = append(lines, " - 无")
}
return styling.Plain(slice.Join(lines, "\n"))
}(),
styling.Plain("\n平均速度: "),
styling.Bold(fmt.Sprintf("%.2f MB/s", dlutil.GetSpeed(info.DownloadedBytes(), p.start)/(1024*1024))),
styling.Plain("\n当前进度: "),
styling.Bold(fmt.Sprintf("%.2f%%", float64(info.DownloadedBytes())/float64(info.TotalBytes())*100)),
); err != nil {
log.FromContext(ctx).Errorf("Failed to build entities: %s", err)
return
}
text, entities := entityBuilder.Complete()
req := &tg.MessagesEditMessageRequest{
ID: p.msgID,
}
req.SetMessage(text)
req.SetEntities(entities)
req.SetReplyMarkup(&tg.ReplyInlineMarkup{
Rows: []tg.KeyboardButtonRow{
{
Buttons: []tg.KeyboardButtonClass{
tgutil.BuildCancelButton(info.TaskID()),
},
},
}},
)
ext := tgutil.ExtFromContext(ctx)
if ext != nil {
ext.EditMessage(p.chatID, req)
return
}
}
// OnStart implements ProgressTracker.
func (p *Progress) OnStart(ctx context.Context, info TaskInfo) {
logger := log.FromContext(ctx)
p.start = time.Now()
p.lastUpdatePercent.Store(0)
logger.Infof("Direct links task started: message_id=%d, chat_id=%d", p.msgID, p.chatID)
ext := tgutil.ExtFromContext(ctx)
if ext == nil {
return
}
entityBuilder := entity.Builder{}
var entities []tg.MessageEntityClass
if err := styling.Perform(&entityBuilder,
styling.Plain(fmt.Sprintf("开始下载, 总大小: %.2f MB (%d 个文件)", float64(info.TotalBytes())/(1024*1024), info.TotalFiles()))); err != nil {
log.FromContext(ctx).Errorf("Failed to build entities: %s", err)
return
}
text, entities := entityBuilder.Complete()
req := &tg.MessagesEditMessageRequest{
ID: p.msgID,
}
req.SetMessage(text)
req.SetEntities(entities)
req.SetReplyMarkup(&tg.ReplyInlineMarkup{
Rows: []tg.KeyboardButtonRow{
{
Buttons: []tg.KeyboardButtonClass{
tgutil.BuildCancelButton(info.TaskID()),
},
},
}},
)
ext.EditMessage(p.chatID, req)
}
var _ ProgressTracker = (*Progress)(nil)
func NewProgress(msgID int, userID int64) ProgressTracker {
return &Progress{
msgID: msgID,
chatID: userID,
}
}

View File

@@ -0,0 +1,130 @@
package directlinks
import (
"context"
"fmt"
"net/http"
"sync"
"sync/atomic"
"github.com/krau/SaveAny-Bot/config"
"github.com/krau/SaveAny-Bot/core"
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
"github.com/krau/SaveAny-Bot/storage"
)
type File struct {
Name string
URL string
Size int64
}
func (f *File) FileName() string {
return f.Name
}
func (f *File) FileSize() int64 {
return f.Size
}
var _ core.Executable = (*Task)(nil)
type Task struct {
ID string
ctx context.Context
files []*File
Storage storage.Storage
StorPath string
Progress ProgressTracker
client *http.Client // [TODO] parallel download
stream bool
totalBytes int64 // total bytes to download
downloadedBytes atomic.Int64 // downloaded bytes
totalFiles int64 // total files to download
downloaded atomic.Int64 // downloaded files count
processing map[string]*File // {"url": File}
processingMu sync.RWMutex
failed map[string]error // [TODO] errors for each file
}
// Title implements core.Exectable.
func (t *Task) Title() string {
return fmt.Sprintf("[%s](%s...->%s:%s)", t.Type(), t.files[0].Name, t.Storage.Name(), t.StorPath)
}
// DownloadedBytes implements TaskInfo.
func (t *Task) DownloadedBytes() int64 {
return t.downloadedBytes.Load()
}
// Processing implements TaskInfo.
func (t *Task) Processing() []FileInfo {
t.processingMu.RLock()
defer t.processingMu.RUnlock()
infos := make([]FileInfo, 0, len(t.processing))
for _, f := range t.processing {
infos = append(infos, f)
}
return infos
}
// StorageName implements TaskInfo.
func (t *Task) StorageName() string {
return t.Storage.Name()
}
// StoragePath implements TaskInfo.
func (t *Task) StoragePath() string {
return t.StorPath
}
// TotalBytes implements TaskInfo.
func (t *Task) TotalBytes() int64 {
return t.totalBytes
}
// TotalFiles implements TaskInfo.
func (t *Task) TotalFiles() int {
return int(t.totalFiles)
}
func (t *Task) Type() tasktype.TaskType {
return tasktype.TaskTypeDirectlinks
}
func (t *Task) TaskID() string {
return t.ID
}
func NewTask(
id string,
ctx context.Context,
links []string,
stor storage.Storage,
storPath string,
progressTracker ProgressTracker,
) *Task {
_, ok := stor.(storage.StorageCannotStream)
stream := config.C().Stream && !ok
files := make([]*File, 0, len(links))
for _, link := range links {
files = append(files, &File{
URL: link,
})
}
return &Task{
ID: id,
ctx: ctx,
files: files,
Storage: stor,
StorPath: storPath,
Progress: progressTracker,
stream: stream,
client: http.DefaultClient,
processing: make(map[string]*File),
processingMu: sync.RWMutex{},
failed: make(map[string]error),
totalFiles: int64(len(files)),
}
}

View File

@@ -0,0 +1,205 @@
package directlinks
import (
"mime"
"net/url"
"strings"
"unicode/utf8"
"golang.org/x/text/encoding/simplifiedchinese"
)
// parseFilename extracts filename from Content-Disposition header
// It handles multiple encoding scenarios:
// 1. RFC 5987/RFC 2231 format: filename*=UTF-8”%E6%B5%8B%E8%AF%95.zip (preferred, checked first)
// 2. MIME encoded-word: filename="=?UTF-8?B?5rWL6K+VLnppcA==?="
// 3. URL-encoded: filename="%E6%B5%8B%E8%AF%95.zip"
// 4. Plain ASCII filename
//
// The key fix is checking filename*= first before mime.ParseMediaType, because
// some servers send Content-Disposition headers with invalid characters that cause
// mime.ParseMediaType to fail, but the filename*= parameter is still valid.
func parseFilename(contentDisposition string) string {
// First, try to find filename*= (RFC 5987 format, most reliable for non-ASCII)
if filename := parseFilenameExtended(contentDisposition); filename != "" {
return filename
}
// Try standard MIME parsing for regular filename= parameter
_, params, err := mime.ParseMediaType(contentDisposition)
if err == nil {
if filename := params["filename"]; filename != "" {
return decodeFilenameParam(filename)
}
}
// Fallback: manual parsing if mime.ParseMediaType fails
return parseFilenameFallback(contentDisposition)
}
// parseFilenameExtended parses RFC 5987/RFC 2231 extended parameter format
// Format: filename*=charset'language'value (e.g., UTF-8”%E6%B5%8B%E8%AF%95.zip)
func parseFilenameExtended(cd string) string {
// Look for filename*= (case-insensitive)
lower := strings.ToLower(cd)
idx := strings.Index(lower, "filename*=")
if idx == -1 {
return ""
}
// Extract the value after filename*=
value := cd[idx+len("filename*="):]
// Find the end of the value (next ; or end of string)
if endIdx := strings.Index(value, ";"); endIdx != -1 {
value = value[:endIdx]
}
value = strings.TrimSpace(value)
// Parse charset'language'encoded-value format
// Common format: UTF-8''%E6%B5%8B%E8%AF%95.zip
parts := strings.SplitN(value, "''", 2)
if len(parts) == 2 {
// parts[0] is charset (e.g., "UTF-8")
// parts[1] is percent-encoded value
decoded, err := url.QueryUnescape(parts[1])
if err == nil {
return decoded
}
}
// Try with single quote delimiter as well (some servers use this)
parts = strings.SplitN(value, "'", 3)
if len(parts) >= 3 {
decoded, err := url.QueryUnescape(parts[2])
if err == nil {
return decoded
}
}
return ""
}
// TryUrlQueryUnescape tries to unescape a URL-encoded string.
//
// If unescaping fails, it returns the original string.
func tryUrlQueryUnescape(s string) string {
if decoded, err := url.QueryUnescape(s); err == nil {
return decoded
}
return s
}
// decodeFilenameParam decodes a filename parameter value
// Handles MIME encoded-word, URL encoding, and GBK encoding fallback
func decodeFilenameParam(filename string) string {
// Check if the filename is MIME encoded-word (e.g., =?UTF-8?B?...?=)
if strings.HasPrefix(filename, "=?") {
decoder := new(mime.WordDecoder)
// Some servers use "UTF8" instead of "UTF-8", create a normalized copy
normalizedFilename := strings.Replace(filename, "UTF8", "UTF-8", 1)
if decoded, err := decoder.Decode(normalizedFilename); err == nil {
return decoded
}
}
// Try URL decoding
decoded := tryUrlQueryUnescape(filename)
// Check if the result is valid UTF-8. If not, try GBK decoding.
// This handles the case where Chinese Windows servers send GBK-encoded filenames
// which appear as garbled characters (e.g., "下载地址.zip" -> "<22><><EFBFBD>ص<EFBFBD>ַ.zip")
if !utf8.ValidString(decoded) {
if gbkDecoded := tryDecodeGBK(decoded); gbkDecoded != "" {
return gbkDecoded
}
}
return decoded
}
// gbkDecoder is a reusable GBK decoder for better performance
var gbkDecoder = simplifiedchinese.GBK.NewDecoder()
// tryDecodeGBK attempts to decode a string as GBK/GB2312/GB18030 encoding
// Returns empty string if decoding fails or result is not valid UTF-8
func tryDecodeGBK(s string) string {
// GBK uses 1-2 bytes per character. Single-byte chars are 0x00-0x7F (ASCII compatible).
// Double-byte chars have first byte 0x81-0xFE and second byte 0x40-0xFE.
// Skip if string is empty or all ASCII (valid UTF-8)
if len(s) == 0 {
return ""
}
// Create a fresh decoder since the transform state may be corrupted
decoder := gbkDecoder
decoded, err := decoder.Bytes([]byte(s))
if err != nil {
return ""
}
result := string(decoded)
if utf8.ValidString(result) {
return result
}
return ""
}
// parseFilenameFallback manually parses filename= when mime.ParseMediaType fails
func parseFilenameFallback(cd string) string {
// Look for filename= (case-insensitive)
lower := strings.ToLower(cd)
idx := strings.Index(lower, "filename=")
if idx == -1 {
return ""
}
// Skip "filename=" prefix
value := cd[idx+len("filename="):]
// Find the end of the value
if endIdx := strings.Index(value, ";"); endIdx != -1 {
value = value[:endIdx]
}
value = strings.TrimSpace(value)
// Remove quotes if present
if len(value) >= 2 {
if (value[0] == '"' && value[len(value)-1] == '"') ||
(value[0] == '\'' && value[len(value)-1] == '\'') {
value = value[1 : len(value)-1]
}
}
return decodeFilenameParam(value)
}
var progressUpdatesLevels = []struct {
size int64 // 文件大小阈值
stepPercent int // 每多少 % 更新一次
}{
{10 << 20, 100},
{50 << 20, 50},
{200 << 20, 20},
{500 << 20, 10},
}
func shouldUpdateProgress(total, downloaded int64, lastUpdatePercent int) bool {
if total <= 0 || downloaded <= 0 {
return false
}
percent := int((downloaded * 100) / total)
if percent <= lastUpdatePercent {
return false
}
step := progressUpdatesLevels[len(progressUpdatesLevels)-1].stepPercent
for _, lvl := range progressUpdatesLevels {
if total < lvl.size {
step = lvl.stepPercent
break
}
}
return percent >= lastUpdatePercent+step
}

View File

@@ -2,24 +2,28 @@ package parsed
import (
"context"
"fmt"
"net/http"
"sync"
"sync/atomic"
"github.com/krau/SaveAny-Bot/common/utils/netutil"
"github.com/krau/SaveAny-Bot/config"
"github.com/krau/SaveAny-Bot/core"
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
"github.com/krau/SaveAny-Bot/pkg/parser"
"github.com/krau/SaveAny-Bot/storage"
)
var _ core.Executable = (*Task)(nil)
type Task struct {
ID string
Ctx context.Context
Stor storage.Storage
StorPath string
item *parser.Item
httpClient *http.Client
httpClient *http.Client // [TODO] btorrent support?
progress ProgressTracker
stream bool
@@ -32,6 +36,11 @@ type Task struct {
failed map[string]error // [TODO] errors for each resource
}
// Title implements core.Exectable.
func (t *Task) Title() string {
return fmt.Sprintf("[%s](%s->%s:%s)", t.Type(), t.item.Title, t.Stor.Name(), t.StorPath)
}
func (t *Task) Type() tasktype.TaskType {
return tasktype.TaskTypeParseditem
}

View File

@@ -2,13 +2,17 @@ package telegraph
import (
"context"
"fmt"
"sync/atomic"
"github.com/krau/SaveAny-Bot/core"
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
"github.com/krau/SaveAny-Bot/pkg/telegraph"
"github.com/krau/SaveAny-Bot/storage"
)
var _ core.Executable = (*Task)(nil)
type Task struct {
ID string
Ctx context.Context
@@ -24,6 +28,11 @@ type Task struct {
downloaded atomic.Int64
}
// Title implements core.Exectable.
func (t *Task) Title() string {
return fmt.Sprintf("[%s](%s->%s:%s)", t.Type(), t.PhPath, t.Stor.Name(), t.StorPath)
}
func (t *Task) Type() tasktype.TaskType {
return tasktype.TaskTypeTphpics
}

View File

@@ -6,11 +6,14 @@ import (
"path/filepath"
"github.com/krau/SaveAny-Bot/config"
"github.com/krau/SaveAny-Bot/core"
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
"github.com/krau/SaveAny-Bot/pkg/tfile"
"github.com/krau/SaveAny-Bot/storage"
)
var _ core.Executable = (*Task)(nil)
type Task struct {
ID string
Ctx context.Context
@@ -22,6 +25,11 @@ type Task struct {
localPath string
}
// Title implements core.Exectable.
func (t *Task) Title() string {
return fmt.Sprintf("[%s](%s->%s:%s)", t.Type(), t.File.Name(), t.Storage.Name(), t.Path)
}
func (t *Task) Type() tasktype.TaskType {
return tasktype.TaskTypeTgfiles
}

12
go.sum
View File

@@ -106,8 +106,6 @@ github.com/glebarez/sqlite v1.11.0 h1:wSG0irqzP6VurnMEpFGer5Li19RpIRi2qvQz++w0GM
github.com/glebarez/sqlite v1.11.0/go.mod h1:h8/o8j5wiAsqSPoWELDUdJXhjAhsVliSn7bWZjOhrgQ=
github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg=
github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo=
github.com/go-faster/jx v1.1.0 h1:ZsW3wD+snOdmTDy9eIVgQdjUpXRRV4rqW8NS3t+20bg=
github.com/go-faster/jx v1.1.0/go.mod h1:vKDNikrKoyUmpzaJ0OkIkRQClNHFX/nF3dnTJZb3skg=
github.com/go-faster/jx v1.2.0 h1:T2YHJPrFaYu21fJtUxC9GzmluKu8rVIFDwwGBKTDseI=
github.com/go-faster/jx v1.2.0/go.mod h1:UWLOVDmMG597a5tBFPLIWJdUxz5/2emOpfsj9Neg0PE=
github.com/go-faster/xor v0.3.0/go.mod h1:x5CaDY9UKErKzqfRfFZdfu+OSTfoZny3w5Ak7UxcipQ=
@@ -155,8 +153,6 @@ github.com/gotd/ige v0.2.2 h1:XQ9dJZwBfDnOGSTxKXBGP4gMud3Qku2ekScRjDWWfEk=
github.com/gotd/ige v0.2.2/go.mod h1:tuCRb+Y5Y3eNTo3ypIfNpQ4MFjrnONiL2jN2AKZXmb0=
github.com/gotd/neo v0.1.5 h1:oj0iQfMbGClP8xI59x7fE/uHoTJD7NZH9oV1WNuPukQ=
github.com/gotd/neo v0.1.5/go.mod h1:9A2a4bn9zL6FADufBdt7tZt+WMhvZoc5gWXihOPoiBQ=
github.com/gotd/td v0.132.0 h1:Iqm3S2b+8kDgA9237IDXRxj7sryUpvy+4Cr50/0tpx4=
github.com/gotd/td v0.132.0/go.mod h1:4CDGYS+rDtOqotRheGaF9MS5g6jaUewvSXqBNJnx8SQ=
github.com/gotd/td v0.136.0 h1:f7vx/1rlvP59L5EKR820XpMRO2k267wW8/F0rAWbepc=
github.com/gotd/td v0.136.0/go.mod h1:mStcqs/9FXhNhWnPTguptSwqkQbRIwXLw3SCSpzPJxM=
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf h1:WfD7VjIE6z8dIvMsI4/s+1qr5EL+zoIGev1BQj1eoJ8=
@@ -169,8 +165,6 @@ github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/johannesboyne/gofakes3 v0.0.0-20250916175020-ebf3e50324d3 h1:2713fQZ560HxoNVgfJH41GKzjMjIG+DW4hH6nYXfXW8=
github.com/johannesboyne/gofakes3 v0.0.0-20250916175020-ebf3e50324d3/go.mod h1:S4S9jGBVlLri0OeqrSSbCGG5vsI6he06UJyuz1WT1EE=
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
@@ -290,8 +284,6 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc=
go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
@@ -305,8 +297,6 @@ golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=
golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w=
golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk=
golang.org/x/mod v0.30.0/go.mod h1:lAsf5O2EvJeSFMiBxXDki7sCgAxEUcZHXoXMKT4GJKc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@@ -356,8 +346,6 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ=
golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs=
golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ=
golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@@ -49,6 +49,7 @@ func ParseWithContext(ctx context.Context, url string) (*parser.Item, error) {
}
}
// CanHandle checks if any registered parser can handle the given URL and returns the parser if found.
func CanHandle(url string) (bool, parser.Parser) {
for _, pser := range parsers.Get() {
if pser.CanHandle(url) {

View File

@@ -1,5 +1,5 @@
package tasktype
//go:generate go-enum --values --names --flag --nocase
// ENUM(tgfiles,tphpics,parseditem)
// ENUM(tgfiles,tphpics,parseditem,directlinks)
type TaskType string

View File

@@ -18,6 +18,8 @@ const (
TaskTypeTphpics TaskType = "tphpics"
// TaskTypeParseditem is a TaskType of type parseditem.
TaskTypeParseditem TaskType = "parseditem"
// TaskTypeDirectlinks is a TaskType of type directlinks.
TaskTypeDirectlinks TaskType = "directlinks"
)
var ErrInvalidTaskType = fmt.Errorf("not a valid TaskType, try [%s]", strings.Join(_TaskTypeNames, ", "))
@@ -26,6 +28,7 @@ var _TaskTypeNames = []string{
string(TaskTypeTgfiles),
string(TaskTypeTphpics),
string(TaskTypeParseditem),
string(TaskTypeDirectlinks),
}
// TaskTypeNames returns a list of possible string values of TaskType.
@@ -41,6 +44,7 @@ func TaskTypeValues() []TaskType {
TaskTypeTgfiles,
TaskTypeTphpics,
TaskTypeParseditem,
TaskTypeDirectlinks,
}
}
@@ -57,9 +61,10 @@ func (x TaskType) IsValid() bool {
}
var _TaskTypeValue = map[string]TaskType{
"tgfiles": TaskTypeTgfiles,
"tphpics": TaskTypeTphpics,
"parseditem": TaskTypeParseditem,
"tgfiles": TaskTypeTgfiles,
"tphpics": TaskTypeTphpics,
"parseditem": TaskTypeParseditem,
"directlinks": TaskTypeDirectlinks,
}
// ParseTaskType attempts to convert a string to a TaskType.

View File

@@ -55,7 +55,7 @@ func (r *Resource) ID() string {
h.Write([]byte(r.Filename))
h.Write([]byte(r.MimeType))
h.Write([]byte(r.Extension))
h.Write([]byte(fmt.Sprintf("%d", r.Size)))
fmt.Fprintf(h, "%d", r.Size)
for k, v := range r.Hash {
h.Write([]byte(k))

View File

@@ -38,7 +38,7 @@ func (tq *TaskQueue[T]) Add(task *Task[T]) error {
return fmt.Errorf("task with ID %s already exists", task.ID)
}
if task.IsCancelled() {
if task.Cancelled() {
return fmt.Errorf("task %s has been cancelled", task.ID)
}
@@ -50,6 +50,8 @@ func (tq *TaskQueue[T]) Add(task *Task[T]) error {
return nil
}
// Get retrieves and removes the next non-cancelled task from the queue, adding it to the running tasks.
// Blocks until a task is available or the queue is closed.
func (tq *TaskQueue[T]) Get() (*Task[T], error) {
tq.mu.Lock()
defer tq.mu.Unlock()
@@ -69,7 +71,7 @@ func (tq *TaskQueue[T]) Get() (*Task[T], error) {
tq.tasks.Remove(element)
task.element = nil
if !task.IsCancelled() {
if !task.Cancelled() {
tq.runningTaskMap[task.ID] = task
return task, nil
}
@@ -82,38 +84,21 @@ func (tq *TaskQueue[T]) Get() (*Task[T], error) {
return nil, fmt.Errorf("queue is closed and empty")
}
// Done stops(cancels) and removes the task from the running tasks.
func (tq *TaskQueue[T]) Done(taskID string) {
tq.mu.Lock()
defer tq.mu.Unlock()
delete(tq.taskMap, taskID)
delete(tq.runningTaskMap, taskID)
}
func (tq *TaskQueue[T]) Peek() (*Task[T], error) {
tq.mu.RLock()
defer tq.mu.RUnlock()
if tq.tasks.Len() == 0 {
return nil, fmt.Errorf("queue is empty")
}
for element := tq.tasks.Front(); element != nil; element = element.Next() {
task := element.Value.(*Task[T])
if !task.IsCancelled() {
return task, nil
}
}
return nil, fmt.Errorf("queue has no valid tasks")
}
func (tq *TaskQueue[T]) Length() int {
tq.mu.RLock()
defer tq.mu.RUnlock()
return tq.tasks.Len()
}
// ActiveLength returns the number of non-cancelled tasks in the queue.
func (tq *TaskQueue[T]) ActiveLength() int {
tq.mu.RLock()
defer tq.mu.RUnlock()
@@ -121,13 +106,58 @@ func (tq *TaskQueue[T]) ActiveLength() int {
count := 0
for element := tq.tasks.Front(); element != nil; element = element.Next() {
task := element.Value.(*Task[T])
if !task.IsCancelled() {
if !task.Cancelled() {
count++
}
}
return count
}
// RunningTasks returns the currently running tasks' info.
func (tq *TaskQueue[T]) RunningTasks() []TaskInfo {
tq.mu.RLock()
defer tq.mu.RUnlock()
tasks := make([]TaskInfo, 0, len(tq.runningTaskMap))
for _, task := range tq.runningTaskMap {
if task.Cancelled() {
continue
}
tasks = append(tasks, TaskInfo{
ID: task.ID,
Title: task.Title,
Created: task.created,
Cancelled: task.Cancelled(),
})
}
return tasks
}
// QueuedTasks returns the queued (not yet running) tasks' info.
// The sorting is in the order of addition.
func (tq *TaskQueue[T]) QueuedTasks() []TaskInfo {
tq.mu.RLock()
defer tq.mu.RUnlock()
tasks := make([]TaskInfo, 0, tq.tasks.Len())
for element := tq.tasks.Front(); element != nil; element = element.Next() {
task := element.Value.(*Task[T])
if !task.Cancelled() {
tasks = append(tasks, TaskInfo{
ID: task.ID,
Title: task.Title,
Created: task.created,
Cancelled: task.Cancelled(),
})
}
}
return tasks
}
// CancelTask cancels a task by its ID.
// It looks for the task in both queued and running tasks.
// [NOTE] Cancelled tasks will not be removed from the queue, but marked as cancelled. Use Done to remove them.
// [WARN] Cancelling a running task relies on the task's implementation to respect the cancellation. If the task does not check for cancellation, it may continue running.
func (tq *TaskQueue[T]) CancelTask(taskID string) error {
tq.mu.RLock()
task, exists := tq.taskMap[taskID]
@@ -144,52 +174,6 @@ func (tq *TaskQueue[T]) CancelTask(taskID string) error {
return nil
}
func (tq *TaskQueue[T]) RemoveTask(taskID string) error {
tq.mu.Lock()
defer tq.mu.Unlock()
task, exists := tq.taskMap[taskID]
if !exists {
_, exists = tq.runningTaskMap[taskID]
if exists {
delete(tq.runningTaskMap, taskID)
}
return fmt.Errorf("task %s is already running, cannot remove from queue", taskID)
}
if task.element != nil {
tq.tasks.Remove(task.element)
}
delete(tq.taskMap, taskID)
task.Cancel()
return nil
}
func (tq *TaskQueue[T]) CancelAll() {
tq.mu.RLock()
tasks := make([]*Task[T], 0, tq.tasks.Len())
for element := tq.tasks.Front(); element != nil; element = element.Next() {
tasks = append(tasks, element.Value.(*Task[T]))
}
tq.mu.RUnlock()
for _, task := range tasks {
task.Cancel()
}
}
func (tq *TaskQueue[T]) GetTask(taskID string) (*Task[T], error) {
tq.mu.RLock()
defer tq.mu.RUnlock()
task, exists := tq.taskMap[taskID]
if !exists {
return nil, fmt.Errorf("task %s does not exist", taskID)
}
return task, nil
}
func (tq *TaskQueue[T]) Close() {
tq.mu.Lock()
defer tq.mu.Unlock()
@@ -197,45 +181,3 @@ func (tq *TaskQueue[T]) Close() {
tq.closed = true
tq.cond.Broadcast()
}
func (tq *TaskQueue[T]) IsClosed() bool {
tq.mu.RLock()
defer tq.mu.RUnlock()
return tq.closed
}
func (tq *TaskQueue[T]) Clear() {
tq.mu.Lock()
defer tq.mu.Unlock()
for element := tq.tasks.Front(); element != nil; element = element.Next() {
task := element.Value.(*Task[T])
task.Cancel()
}
tq.tasks.Init()
tq.taskMap = make(map[string]*Task[T])
}
func (tq *TaskQueue[T]) CleanupCancelled() int {
tq.mu.Lock()
defer tq.mu.Unlock()
removed := 0
element := tq.tasks.Front()
for element != nil {
next := element.Next()
task := element.Value.(*Task[T])
if task.IsCancelled() {
tq.tasks.Remove(element)
delete(tq.taskMap, task.ID)
removed++
}
element = next
}
return removed
}

View File

@@ -11,7 +11,7 @@ import (
// helper to create a simple Task with integer payload
func newTask(id string) *queue.Task[int] {
return queue.NewTask(context.Background(), id, 0)
return queue.NewTask(context.Background(), id, "testing", 0)
}
func TestAddAndLength(t *testing.T) {
@@ -39,37 +39,6 @@ func TestDuplicateAdd(t *testing.T) {
}
}
func TestGetAndPeek(t *testing.T) {
q := queue.NewTaskQueue[int]()
t1 := newTask("a")
t2 := newTask("b")
q.Add(t1)
q.Add(t2)
// Peek should return t1
peeked, err := q.Peek()
if err != nil {
t.Fatalf("unexpected error on Peek: %v", err)
}
if peeked.ID != "a" {
t.Fatalf("expected Peek ID 'a', got '%s'", peeked.ID)
}
// Get should return t1 then t2
first, err := q.Get()
if err != nil {
t.Fatalf("unexpected error on Get: %v", err)
}
if first.ID != "a" {
t.Fatalf("expected first Get ID 'a', got '%s'", first.ID)
}
second, err := q.Get()
if err != nil {
t.Fatalf("unexpected error on second Get: %v", err)
}
if second.ID != "b" {
t.Fatalf("expected second Get ID 'b', got '%s'", second.ID)
}
}
func TestCancelAndActiveLength(t *testing.T) {
q := queue.NewTaskQueue[int]()
t1 := newTask("1")
@@ -90,41 +59,6 @@ func TestCancelAndActiveLength(t *testing.T) {
}
}
func TestRemoveTask(t *testing.T) {
q := queue.NewTaskQueue[int]()
t1 := newTask("r1")
q.Add(t1)
if err := q.RemoveTask("r1"); err != nil {
t.Fatalf("unexpected error on RemoveTask: %v", err)
}
if q.Length() != 0 {
t.Fatalf("expected length 0 after remove, got %d", q.Length())
}
}
func TestClearAndCleanupCancelled(t *testing.T) {
q := queue.NewTaskQueue[int]()
tasks := []*queue.Task[int]{newTask("c1"), newTask("c2"), newTask("c3")}
for _, tsk := range tasks {
q.Add(tsk)
}
// Cancel one
q.CancelTask("c2")
// Cleanup cancelled
removed := q.CleanupCancelled()
if removed != 1 {
t.Fatalf("expected removed 1, got %d", removed)
}
if q.ActiveLength() != 2 {
t.Fatalf("expected active length 2 after cleanup, got %d", q.ActiveLength())
}
// Clear all
q.Clear()
if q.Length() != 0 {
t.Fatalf("expected length 0 after clear, got %d", q.Length())
}
}
func TestCloseBehavior(t *testing.T) {
q := queue.NewTaskQueue[int]()
done := make(chan struct{})

View File

@@ -8,6 +8,7 @@ import (
type Task[T any] struct {
ID string
Title string
Data T
ctx context.Context
cancel context.CancelFunc
@@ -15,10 +16,19 @@ type Task[T any] struct {
element *list.Element
}
func NewTask[T any](ctx context.Context, id string, data T) *Task[T] {
// Read-only info about a task
type TaskInfo struct {
ID string
Created time.Time
Cancelled bool
Title string
}
func NewTask[T any](ctx context.Context, id string, title string, data T) *Task[T] {
cancelCtx, cancel := context.WithCancel(ctx)
return &Task[T]{
ID: id,
Title: title,
Data: data,
ctx: cancelCtx,
cancel: cancel,
@@ -26,7 +36,7 @@ func NewTask[T any](ctx context.Context, id string, data T) *Task[T] {
}
}
func (t *Task[T]) IsCancelled() bool {
func (t *Task[T]) Cancelled() bool {
select {
case <-t.ctx.Done():
return true

View File

@@ -43,6 +43,8 @@ type Add struct {
TphDirPath string // unescaped telegraph.Page.Path
// parseditem
ParsedItem *parser.Item
// directlinks
DirectLinks []string
}
type SetDefaultStorage struct {

147
storage/telegram/split.go Normal file
View File

@@ -0,0 +1,147 @@
package telegram
import (
"archive/zip"
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"
)
type splitWriter struct {
baseName string
partSize int64
currentPart int
currentSize int64
currentFile *os.File
totalParts int
}
func newSplitWriter(baseName string, partSize int64) *splitWriter {
return &splitWriter{
baseName: baseName,
partSize: partSize,
currentPart: 0,
}
}
// Write implements io.Writer interface
func (w *splitWriter) Write(p []byte) (n int, err error) {
written := 0
for written < len(p) {
if w.currentFile == nil || w.currentSize >= w.partSize {
if err := w.nextPart(); err != nil {
return written, err
}
}
toWrite := int64(len(p) - written)
remaining := w.partSize - w.currentSize
if toWrite > remaining {
toWrite = remaining
}
nw, err := w.currentFile.Write(p[written : written+int(toWrite)])
written += nw
w.currentSize += int64(nw)
if err != nil {
return written, err
}
}
return written, nil
}
func (w *splitWriter) Close() error {
if w.currentFile != nil {
return w.currentFile.Close()
}
return nil
}
func (w *splitWriter) nextPart() error {
if w.currentFile != nil {
if err := w.currentFile.Close(); err != nil {
return err
}
}
partName := w.partName(w.currentPart)
file, err := os.Create(partName)
if err != nil {
return err
}
w.currentFile = file
w.currentSize = 0
w.currentPart++
return nil
}
func (w *splitWriter) partName(partNum int) string {
// file.zip.001, file.zip.002, ...
return fmt.Sprintf("%s.zip.%03d", w.baseName, partNum+1)
}
func (w *splitWriter) finalize() error {
w.totalParts = w.currentPart
// 如果只有一个分卷,直接重命名为 .zip
if w.totalParts == 1 {
oldName := fmt.Sprintf("%s.zip.001", w.baseName)
newName := fmt.Sprintf("%s.zip", w.baseName)
return os.Rename(oldName, newName)
}
return nil
}
func CreateSplitZip(ctx context.Context, reader io.Reader, size int64, fileName, outputBase string, partSize int64) error {
// seek the reader if possible
if rs, ok := reader.(io.ReadSeeker); ok {
if _, err := rs.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("failed to seek reader: %w", err)
}
}
outputDir := filepath.Dir(outputBase)
if err := os.MkdirAll(outputDir, os.ModePerm); err != nil {
return fmt.Errorf("failed to create output directory: %w", err)
}
splitWriter := newSplitWriter(outputBase, partSize)
defer splitWriter.Close()
zipWriter := zip.NewWriter(splitWriter)
defer zipWriter.Close()
header := &zip.FileHeader{
Name: fileName,
Method: zip.Store, // just store without compression
Modified: time.Now(),
}
writer, err := zipWriter.CreateHeader(header)
if err != nil {
return fmt.Errorf("failed to create zip header: %w", err)
}
copied, err := io.Copy(writer, reader)
if err != nil {
return fmt.Errorf("failed to write data: %w", err)
}
if copied != size {
return fmt.Errorf("incomplete write: expected %d bytes, got %d bytes", size, copied)
}
if err := zipWriter.Close(); err != nil {
return fmt.Errorf("failed to close zip writer: %w", err)
}
if err := splitWriter.Close(); err != nil {
return fmt.Errorf("failed to close split writer: %w", err)
}
if err := splitWriter.finalize(); err != nil {
return fmt.Errorf("failed to rename split files: %w", err)
}
return nil
}

View File

@@ -0,0 +1,55 @@
package telegram
import (
"os"
"path/filepath"
"testing"
)
func TestCreateSplitZip(t *testing.T) {
input := "tests/testfile.dat"
file, err := os.Open(input)
if err != nil {
t.Fatalf("failed to open test file: %v", err)
}
defer file.Close()
fileName := filepath.Base(input)
fileInfo, err := file.Stat()
if err != nil {
t.Fatalf("failed to stat test file: %v", err)
}
fileSize := fileInfo.Size()
tests := []struct {
partSize int64
output string
}{
{partSize: int64(1024 * 1024 * 500), output: "tests/split_test_output_500MB"},
{partSize: int64(1024 * 1024 * 100), output: "tests/split_test_output_100MB"},
}
for _, tt := range tests {
err = CreateSplitZip(t.Context(), file, fileSize, fileName, tt.output, tt.partSize)
if err != nil {
t.Fatalf("CreateSplitZip failed: %v", err)
}
matched, err := filepath.Glob(tt.output + ".z*")
if err != nil {
t.Fatalf("failed to glob split files: %v", err)
}
if len(matched) == 0 {
t.Fatalf("no split files found")
}
t.Logf("Created %d split files", len(matched))
for _, f := range matched {
info, err := os.Stat(f)
if err != nil {
t.Fatalf("failed to stat file %s: %v", f, err)
}
if info.Size() > tt.partSize {
t.Errorf("file %s exceeds part size: %d > %d", f, info.Size(), tt.partSize)
}
t.Logf(" - %s (%d bytes)", f, info.Size())
}
}
}

View File

@@ -4,10 +4,13 @@ import (
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"time"
"github.com/celestix/gotgproto/ext"
"github.com/charmbracelet/log"
"github.com/duke-git/lancet/v2/slice"
"github.com/duke-git/lancet/v2/validator"
@@ -16,6 +19,7 @@ import (
"github.com/gotd/td/telegram/message/styling"
"github.com/gotd/td/telegram/uploader"
"github.com/gotd/td/tg"
"github.com/krau/SaveAny-Bot/common/utils/dlutil"
"github.com/krau/SaveAny-Bot/common/utils/tgutil"
"github.com/krau/SaveAny-Bot/config"
storconfig "github.com/krau/SaveAny-Bot/config/storage"
@@ -26,6 +30,12 @@ import (
"golang.org/x/time/rate"
)
const (
// https://core.telegram.org/api/config#upload-max-fileparts-default
DefaultSplitSize = 4000 * 524288 // 4000 * 512 KB
MaxUploadFileSize = 4000 * 524288 // 4000 * 512 KB
)
type Telegram struct {
config storconfig.TelegramStorageConfig
limiter *rate.Limiter
@@ -65,22 +75,39 @@ func (t *Telegram) Exists(ctx context.Context, storagePath string) bool {
}
func (t *Telegram) Save(ctx context.Context, r io.Reader, storagePath string) error {
if err := t.limiter.Wait(ctx); err != nil {
return fmt.Errorf("rate limit failed: %w", err)
tctx := tgutil.ExtFromContext(ctx)
if tctx == nil {
return fmt.Errorf("failed to get telegram context")
}
size := func() int64 {
if length := ctx.Value(ctxkey.ContentLength); length != nil {
if l, ok := length.(int64); ok {
return l
}
}
return -1 // unknown size
}()
if t.config.SkipLarge && size > MaxUploadFileSize {
log.FromContext(ctx).Warnf("Skipping file larger than Telegram limit (%d bytes): %d bytes", MaxUploadFileSize, size)
return nil
}
rs, seekable := r.(io.ReadSeeker)
if !seekable || rs == nil {
return fmt.Errorf("reader must implement io.ReadSeeker")
}
tctx := tgutil.ExtFromContext(ctx)
if tctx == nil {
return fmt.Errorf("failed to get telegram context")
splitSize := t.config.SplitSizeMB * 1024 * 1024
if splitSize <= 0 {
splitSize = DefaultSplitSize
}
if err := t.limiter.Wait(ctx); err != nil {
return fmt.Errorf("rate limit failed: %w", err)
}
// 去除前导斜杠并分隔路径, 当 len(parts):
// ==0, 存储到配置文件中的 chat_id, 随机文件名
// ==1, 视作只有文件名, 存储到配置文件中的 chat_id
// ==2, parts[0]: 视作要存储到的 chat_id, parts[1]: filename
parts := slice.Compact(strings.Split(strings.TrimPrefix(storagePath, "/"), "/"))
filename := ""
chatID := t.config.ChatID
@@ -113,17 +140,13 @@ func (t *Telegram) Save(ctx context.Context, r io.Reader, storagePath string) er
}
upler := uploader.NewUploader(tctx.Raw).
WithPartSize(tglimit.MaxUploadPartSize).
WithThreads(config.C().Threads)
WithThreads(dlutil.BestThreads(size, config.C().Threads))
if size > splitSize {
// large file, use split uploader
return t.splitUpload(tctx, rs, filename, upler, peer, size, splitSize)
}
var file tg.InputFileClass
size := func() int64 {
if length := ctx.Value(ctxkey.ContentLength); length != nil {
if l, ok := length.(int64); ok {
return l
}
}
return -1 // unknown size
}()
if size < 0 {
file, err = upler.FromReader(ctx, filename, rs)
} else {
@@ -186,3 +209,91 @@ func (t *Telegram) Save(ctx context.Context, r io.Reader, storagePath string) er
func (t *Telegram) CannotStream() string {
return "Telegram storage must use a ReaderSeeker"
}
func (t *Telegram) splitUpload(ctx *ext.Context, rs io.ReadSeeker, filename string, upler *uploader.Uploader, peer tg.InputPeerClass, fileSize, splitSize int64) error {
tempId := xid.New().String()
outputBase := filepath.Join(config.C().Temp.BasePath, tempId, strings.Split(filename, ".")[0])
defer func() {
// cleanup temp files
if err := os.RemoveAll(filepath.Join(config.C().Temp.BasePath, tempId)); err != nil {
log.FromContext(ctx).Warnf("Failed to cleanup temp split files: %s", err)
}
}()
if err := CreateSplitZip(ctx, rs, fileSize, filename, outputBase, splitSize); err != nil {
return fmt.Errorf("failed to create split zip: %w", err)
}
matched, err := filepath.Glob(outputBase + ".z*")
if err != nil {
return fmt.Errorf("failed to glob split files: %w", err)
}
inputFiles := make([]tg.InputFileClass, 0, len(matched))
for _, partPath := range matched {
// 串行上传, 不然容易被tg风控
err = func() error {
partFile, err := os.Open(partPath)
if err != nil {
return fmt.Errorf("failed to open split part %s: %w", partPath, err)
}
defer partFile.Close()
partInfo, err := partFile.Stat()
if err != nil {
return fmt.Errorf("failed to stat split part %s: %w", partPath, err)
}
partFileSize := partInfo.Size()
partName := filepath.Base(partPath)
partInputFile, err := upler.Upload(ctx, uploader.NewUpload(partName, partFile, partFileSize))
if err != nil {
return fmt.Errorf("failed to upload split part %s: %w", partPath, err)
}
inputFiles = append(inputFiles, partInputFile)
return nil
}()
if err != nil {
return fmt.Errorf("failed to upload split part %s: %w", partPath, err)
}
}
if len(inputFiles) == 1 {
// only one part, send as normal file
// shoud not happen as we already check fileSize > splitSize
doc := message.UploadedDocument(inputFiles[0]).
Filename(filepath.Base(matched[0])).
ForceFile(true).
MIME("application/zip")
_, err = ctx.Sender.
WithUploader(upler).
To(peer).
Media(ctx, doc)
return err
}
multiMedia := make([]message.MultiMediaOption, 0, len(inputFiles))
for i, inputFile := range inputFiles {
doc := message.UploadedDocument(inputFile).
Filename(filepath.Base(matched[i])).
MIME("application/zip")
multiMedia = append(multiMedia, doc)
}
sender := ctx.Sender
if len(multiMedia) <= 10 {
_, err = sender.WithUploader(upler).
To(peer).
Album(ctx, multiMedia[0], multiMedia[1:]...)
return err
}
// more than 10 parts, send in batches, each batch up to 10 parts
for i := 0; i < len(multiMedia); i += 10 {
end := min(i+10, len(multiMedia))
batch := multiMedia[i:end]
_, err = sender.WithUploader(upler).
To(peer).
Album(ctx, batch[0], batch[1:]...)
if err != nil {
return fmt.Errorf("failed to send album batch: %w", err)
}
}
return nil
}