diff --git a/client/bot/handlers/utils/shortcut/message.go b/client/bot/handlers/utils/shortcut/message.go index b9c8621..5542879 100644 --- a/client/bot/handlers/utils/shortcut/message.go +++ b/client/bot/handlers/utils/shortcut/message.go @@ -126,7 +126,7 @@ func GetFilesFromUpdateLinkMessageWithReplyEdit(ctx *ext.Context, update *ext.Up } msg, err := tgutil.GetMessageByID(tctx, chatId, msgId) if err != nil { - logger.Errorf("failed to get message by ID: %s", err) + logger.Error(err) continue } groupID, isGroup := msg.GetGroupedID() diff --git a/pkg/tfile/dler.go b/common/tdler/dler.go similarity index 76% rename from pkg/tfile/dler.go rename to common/tdler/dler.go index bfe8022..6ec2f48 100644 --- a/pkg/tfile/dler.go +++ b/common/tdler/dler.go @@ -1,13 +1,14 @@ -package tfile +package tdler import ( "github.com/gotd/td/telegram/downloader" "github.com/krau/SaveAny-Bot/common/utils/dlutil" "github.com/krau/SaveAny-Bot/config" "github.com/krau/SaveAny-Bot/pkg/consts/tglimit" + "github.com/krau/SaveAny-Bot/pkg/tfile" ) -func NewDownloader(file TGFile) *downloader.Builder { +func NewDownloader(file tfile.TGFile) *downloader.Builder { return downloader.NewDownloader().WithPartSize(tglimit.MaxPartSize). Download(file.Dler(), file.Location()).WithThreads(dlutil.BestThreads(file.Size(), config.C().Threads)) } diff --git a/common/utils/tgutil/message.go b/common/utils/tgutil/message.go index b4f0840..91ae085 100644 --- a/common/utils/tgutil/message.go +++ b/common/utils/tgutil/message.go @@ -9,12 +9,12 @@ import ( "github.com/celestix/gotgproto/ext" "github.com/duke-git/lancet/v2/maputil" - "github.com/duke-git/lancet/v2/mathutil" "github.com/duke-git/lancet/v2/slice" lcstrutil "github.com/duke-git/lancet/v2/strutil" "github.com/duke-git/lancet/v2/validator" "github.com/gabriel-vasile/mimetype" + "github.com/gotd/td/constant" "github.com/gotd/td/tg" "github.com/krau/SaveAny-Bot/common/cache" "github.com/krau/SaveAny-Bot/common/utils/strutil" @@ -112,6 +112,31 @@ func InputMessageClassSliceFromInt(ids []int) []tg.InputMessageClass { } func GetMessagesRange(ctx *ext.Context, chatID int64, minId, maxId int) ([]*tg.Message, error) { + if msg, err := getMessagesRange(ctx, chatID, minId, maxId); err == nil { + return msg, nil + } + in := constant.TDLibPeerID(chatID) + plain := in.ToPlain() + + var channel constant.TDLibPeerID + channel.Channel(plain) + if msg, err := getMessagesRange(ctx, int64(channel), minId, maxId); err == nil { + return msg, nil + } + var userID constant.TDLibPeerID + userID.User(plain) + if msg, err := getMessagesRange(ctx, int64(userID), minId, maxId); err == nil { + return msg, nil + } + var chat constant.TDLibPeerID + chat.Chat(plain) + if msg, err := getMessagesRange(ctx, int64(chat), minId, maxId); err == nil { + return msg, nil + } + return nil, fmt.Errorf("failed to get messages range for chatID %d", chatID) +} + +func getMessagesRange(ctx *ext.Context, chatID int64, minId, maxId int) ([]*tg.Message, error) { if minId > maxId { return nil, fmt.Errorf("minId (%d) cannot be greater than maxId (%d)", minId, maxId) } @@ -167,97 +192,98 @@ func GetMessagesRange(ctx *ext.Context, chatID int64, minId, maxId int) ([]*tg.M return result, nil } -type MessageItem struct { - Message *tg.Message - Error error -} +// [TODO] +// type MessageItem struct { +// Message *tg.Message +// Error error +// } -func IterMessages(ctx *ext.Context, chatID int64, minId, maxId int) (<-chan MessageItem, error) { - total := maxId - minId + 1 - ch := make(chan MessageItem, 100) +// func IterMessages(ctx *ext.Context, chatID int64, minId, maxId int) (<-chan MessageItem, error) { +// total := maxId - minId + 1 +// ch := make(chan MessageItem, 100) - go func() { - defer close(ch) - if !ctx.Self.Bot { - perr := ctx.PeerStorage.GetInputPeerById(chatID) - if perr == nil || perr.(*tg.InputPeerEmpty) != nil { - ch <- MessageItem{ - Error: fmt.Errorf("peer not found: %d", chatID), - } - return - } +// go func() { +// defer close(ch) +// if !ctx.Self.Bot { +// perr := ctx.PeerStorage.GetInputPeerById(chatID) +// if perr == nil || perr.(*tg.InputPeerEmpty) != nil { +// ch <- MessageItem{ +// Error: fmt.Errorf("peer not found: %d", chatID), +// } +// return +// } - for i := 0; i < total; i += 100 { - start := minId + i - end := min(start+100, maxId) - msgs, err := ctx.Raw.MessagesGetHistory(ctx, &tg.MessagesGetHistoryRequest{ - Peer: perr, - OffsetID: start, - AddOffset: start - end, - Limit: 100, - }) - if err != nil { - ch <- MessageItem{ - Error: fmt.Errorf("failed to get messages: %w", err), - } - return - } - var msgClass []tg.MessageClass - switch msgsv := msgs.(type) { - case *tg.MessagesMessages: - msgClass = msgsv.GetMessages() - case *tg.MessagesMessagesSlice: - msgClass = msgsv.GetMessages() - case *tg.MessagesChannelMessages: - msgClass = msgsv.GetMessages() - default: - ch <- MessageItem{ - Error: fmt.Errorf("unsupported message type: %T", msgsv), - } - continue - } - for _, msg := range msgClass { - msg, ok := msg.AsNotEmpty() - if !ok { - continue - } - switch msg := msg.(type) { - case *tg.Message: - key := fmt.Sprintf("tgmsg:%d:%d:%d", ctx.Self.ID, chatID, msg.GetID()) - cache.Set(key, msg) - ch <- MessageItem{ - Message: msg, - } - } - } - } - } else { - for i := 0; i < total; i += 100 { - start := minId + i - end := min(start+100, maxId) - msgs, err := GetMessagesRange(ctx, chatID, start, end) - if err != nil { - ch <- MessageItem{ - Error: fmt.Errorf("failed to get messages: %w", err), - } - return - } - for _, msg := range msgs { - if msg == nil { - continue - } - ch <- MessageItem{ - Message: msg, - } - } - } - } - }() +// for i := 0; i < total; i += 100 { +// start := minId + i +// end := min(start+100, maxId) +// msgs, err := ctx.Raw.MessagesGetHistory(ctx, &tg.MessagesGetHistoryRequest{ +// Peer: perr, +// OffsetID: start, +// AddOffset: start - end, +// Limit: 100, +// }) +// if err != nil { +// ch <- MessageItem{ +// Error: fmt.Errorf("failed to get messages: %w", err), +// } +// return +// } +// var msgClass []tg.MessageClass +// switch msgsv := msgs.(type) { +// case *tg.MessagesMessages: +// msgClass = msgsv.GetMessages() +// case *tg.MessagesMessagesSlice: +// msgClass = msgsv.GetMessages() +// case *tg.MessagesChannelMessages: +// msgClass = msgsv.GetMessages() +// default: +// ch <- MessageItem{ +// Error: fmt.Errorf("unsupported message type: %T", msgsv), +// } +// continue +// } +// for _, msg := range msgClass { +// msg, ok := msg.AsNotEmpty() +// if !ok { +// continue +// } +// switch msg := msg.(type) { +// case *tg.Message: +// key := fmt.Sprintf("tgmsg:%d:%d:%d", ctx.Self.ID, chatID, msg.GetID()) +// cache.Set(key, msg) +// ch <- MessageItem{ +// Message: msg, +// } +// } +// } +// } +// } else { +// for i := 0; i < total; i += 100 { +// start := minId + i +// end := min(start+100, maxId) +// msgs, err := GetMessagesRange(ctx, chatID, start, end) +// if err != nil { +// ch <- MessageItem{ +// Error: fmt.Errorf("failed to get messages: %w", err), +// } +// return +// } +// for _, msg := range msgs { +// if msg == nil { +// continue +// } +// ch <- MessageItem{ +// Message: msg, +// } +// } +// } +// } +// }() - return ch, nil -} +// return ch, nil +// } -func GetMessageByID(ctx *ext.Context, chatID int64, msgID int) (*tg.Message, error) { +func getMessageByID(ctx *ext.Context, chatID int64, msgID int) (*tg.Message, error) { key := fmt.Sprintf("tgmsg:%d:%d:%d", ctx.Self.ID, chatID, msgID) if msg, ok := cache.Get[*tg.Message](key); ok { return msg, nil @@ -280,6 +306,33 @@ func GetMessageByID(ctx *ext.Context, chatID int64, msgID int) (*tg.Message, err return tgm, nil } +// f**k gotgproto's breaking changes +func GetMessageByID(ctx *ext.Context, chatID int64, msgID int) (*tg.Message, error) { + // we don't know what the input chatID is bot api style(e.g. channel with -100 prefix) or plain tdlib style(no any prefix and every id is positive) + if msg, err := getMessageByID(ctx, chatID, msgID); err == nil { + return msg, nil + } + in := constant.TDLibPeerID(chatID) + plain := in.ToPlain() + var channel constant.TDLibPeerID + channel.Channel(plain) + if msg, err := getMessageByID(ctx, int64(channel), msgID); err == nil { + return msg, nil + } + var chat constant.TDLibPeerID + chat.Chat(plain) + if msg, err := getMessageByID(ctx, int64(chat), msgID); err == nil { + return msg, nil + } + var userID constant.TDLibPeerID + userID.User(plain) + if msg, err := getMessageByID(ctx, int64(userID), msgID); err == nil { + return msg, nil + } + + return nil, fmt.Errorf("failed to get message by ID: chatID=%d, msgID=%d", chatID, msgID) +} + func GetGroupedMessages(ctx *ext.Context, chatID int64, msg *tg.Message) ([]*tg.Message, error) { groupID, isGroup := msg.GetGroupedID() if !isGroup || groupID == 0 { diff --git a/core/tasks/batchtfile/execute.go b/core/tasks/batchtfile/execute.go index c6c9be7..1c9427a 100644 --- a/core/tasks/batchtfile/execute.go +++ b/core/tasks/batchtfile/execute.go @@ -9,11 +9,11 @@ import ( "github.com/charmbracelet/log" "github.com/duke-git/lancet/v2/retry" + "github.com/krau/SaveAny-Bot/common/tdler" "github.com/krau/SaveAny-Bot/common/utils/fsutil" "github.com/krau/SaveAny-Bot/common/utils/ioutil" "github.com/krau/SaveAny-Bot/config" "github.com/krau/SaveAny-Bot/pkg/enums/ctxkey" - "github.com/krau/SaveAny-Bot/pkg/tfile" "golang.org/x/sync/errgroup" ) @@ -68,7 +68,7 @@ func (t *Task) processElement(ctx context.Context, elem TaskElement) error { errg.Go(func() error { defer pw.Close() logger.Info("Starting file download in stream mode") - _, err := tfile.NewDownloader(elem.File).Stream(uploadCtx, wr) + _, err := tdler.NewDownloader(elem.File).Stream(uploadCtx, wr) if err != nil { logger.Errorf("Failed to download file: %v", err) pw.CloseWithError(err) @@ -95,7 +95,7 @@ func (t *Task) processElement(ctx context.Context, elem TaskElement) error { t.downloaded.Add(int64(n)) t.Progress.OnProgress(ctx, t) }) - _, err = tfile.NewDownloader(elem.File).Parallel(ctx, wrAt) + _, err = tdler.NewDownloader(elem.File).Parallel(ctx, wrAt) if err != nil { return fmt.Errorf("failed to download file: %w", err) } diff --git a/core/tasks/telegraph/execute.go b/core/tasks/telegraph/execute.go index 72d0dfb..dfa327b 100644 --- a/core/tasks/telegraph/execute.go +++ b/core/tasks/telegraph/execute.go @@ -11,7 +11,6 @@ import ( "github.com/duke-git/lancet/v2/retry" "github.com/krau/SaveAny-Bot/common/utils/fsutil" "github.com/krau/SaveAny-Bot/config" - "go.uber.org/multierr" "golang.org/x/sync/errgroup" ) @@ -48,13 +47,10 @@ func (t *Task) processPic(ctx context.Context, picUrl string, index int) error { retry.Context(ctx), retry.RetryTimes(uint(config.C().Retry)), } - var lastErr error err := retry.Retry(func() error { - var body io.ReadCloser - body, lastErr = t.client.Download(ctx, picUrl) - if lastErr != nil { - lastErr = fmt.Errorf("failed to download picture %s: %w", picUrl, lastErr) - return lastErr + body, err := t.client.Download(ctx, picUrl) + if err != nil { + return fmt.Errorf("failed to download picture %s: %w", picUrl, err) } defer body.Close() filename := fmt.Sprintf("%d%s", index+1, path.Ext(picUrl)) @@ -63,8 +59,7 @@ func (t *Task) processPic(ctx context.Context, picUrl string, index int) error { fmt.Sprintf("tph_%s_%s", t.TaskID(), filename), )) if err != nil { - lastErr = fmt.Errorf("failed to create cache file for picture %s: %w", filename, err) - return lastErr + return fmt.Errorf("failed to create cache file for picture %s: %w", filename, err) } defer func() { if err := cacheFile.CloseAndRemove(); err != nil { @@ -72,26 +67,26 @@ func (t *Task) processPic(ctx context.Context, picUrl string, index int) error { logger.Errorf("Failed to close and remove cache file for picture %s: %v", filename, err) } }() - _, lastErr = io.Copy(cacheFile, body) - if lastErr != nil { - lastErr = fmt.Errorf("failed to copy picture %s to cache file: %w", filename, lastErr) - return lastErr + _, err = io.Copy(cacheFile, body) + if err != nil { + return fmt.Errorf("failed to copy picture %s to cache file: %w", filename, err) } _, err = cacheFile.Seek(0, 0) if err != nil { - lastErr = fmt.Errorf("failed to seek cache file for picture %s: %w", filename, err) - return lastErr + return fmt.Errorf("failed to seek cache file for picture %s: %w", filename, err) + } + err = t.Stor.Save(ctx, cacheFile, path.Join(t.StorPath, filename)) + if err != nil { + return fmt.Errorf("failed to save picture %s: %w", filename, err) } - lastErr = t.Stor.Save(ctx, cacheFile, path.Join(t.StorPath, filename)) } else { - lastErr = t.Stor.Save(ctx, body, path.Join(t.StorPath, filename)) + err = t.Stor.Save(ctx, body, path.Join(t.StorPath, filename)) } - if lastErr != nil { - lastErr = fmt.Errorf("failed to save picture %s: %w", filename, lastErr) - return lastErr + if err != nil { + return fmt.Errorf("failed to save picture %s: %w", filename, err) } return nil }, retryOpts...) - return multierr.Combine(err, lastErr) + return err } diff --git a/core/tasks/tfile/execute.go b/core/tasks/tfile/execute.go index 305c028..4efbfc0 100644 --- a/core/tasks/tfile/execute.go +++ b/core/tasks/tfile/execute.go @@ -5,13 +5,13 @@ import ( "fmt" "os" "path" - "time" "github.com/charmbracelet/log" + "github.com/duke-git/lancet/v2/retry" + "github.com/krau/SaveAny-Bot/common/tdler" "github.com/krau/SaveAny-Bot/common/utils/fsutil" "github.com/krau/SaveAny-Bot/config" "github.com/krau/SaveAny-Bot/pkg/enums/ctxkey" - "github.com/krau/SaveAny-Bot/pkg/tfile" ) func (t *Task) Execute(ctx context.Context) error { @@ -40,7 +40,7 @@ func (t *Task) Execute(ctx context.Context) error { t.Progress.OnDone(ctx, t, err) } }() - _, err = tfile.NewDownloader(t.File).Parallel(ctx, wrAt) + _, err = tdler.NewDownloader(t.File).Parallel(ctx, wrAt) if err != nil { return fmt.Errorf("failed to download file: %w", err) } @@ -57,30 +57,19 @@ func (t *Task) Execute(ctx context.Context) error { return fmt.Errorf("failed to get file stat: %w", err) } vctx := context.WithValue(ctx, ctxkey.ContentLength, fileStat.Size()) - for i := range config.C().Retry + 1 { - if err = vctx.Err(); err != nil { - return fmt.Errorf("context canceled while saving file: %w", err) - } - var file *os.File - file, err = os.Open(t.localPath) + err = retry.Retry(func() error { + file, err := os.Open(t.localPath) if err != nil { return fmt.Errorf("failed to open cache file: %w", err) } defer file.Close() if err = t.Storage.Save(vctx, file, t.Path); err != nil { - if i == config.C().Retry { - return fmt.Errorf("failed to save file: %w", err) - } - logger.Errorf("Failed to save file: %s, retrying...", err) - select { - case <-vctx.Done(): - return fmt.Errorf("context canceled during retry delay: %w", vctx.Err()) - case <-time.After(time.Duration(i*500) * time.Millisecond): - } - continue + return fmt.Errorf("failed to save file: %w", err) } return nil + }, retry.RetryTimes(uint(config.C().Retry)), retry.Context(vctx)) + if err != nil { + return fmt.Errorf("failed to save file after retries: %w", err) } - return fmt.Errorf("failed to save file after retries") - + return nil } diff --git a/core/tasks/tfile/stream.go b/core/tasks/tfile/stream.go index 7bff591..881c31e 100644 --- a/core/tasks/tfile/stream.go +++ b/core/tasks/tfile/stream.go @@ -6,7 +6,7 @@ import ( "io" "github.com/charmbracelet/log" - "github.com/krau/SaveAny-Bot/pkg/tfile" + "github.com/krau/SaveAny-Bot/common/tdler" "golang.org/x/sync/errgroup" ) @@ -23,7 +23,7 @@ func executeStream(ctx context.Context, task *Task) error { errg.Go(func() error { defer pw.Close() logger.Info("Starting file download in stream mode") - _, err := tfile.NewDownloader(task.File).Stream(uploadCtx, wr) + _, err := tdler.NewDownloader(task.File).Stream(uploadCtx, wr) if err != nil { logger.Errorf("Failed to download file: %v", err) pw.CloseWithError(err) diff --git a/go.mod b/go.mod index 68b84dc..3db515d 100644 --- a/go.mod +++ b/go.mod @@ -117,7 +117,7 @@ require ( github.com/spf13/pflag v1.0.10 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect - go.uber.org/multierr v1.11.0 + go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect golang.org/x/sync v0.18.0 golang.org/x/sys v0.38.0 // indirect diff --git a/storage/telegram/telegram.go b/storage/telegram/telegram.go index 169479c..7c0ad79 100644 --- a/storage/telegram/telegram.go +++ b/storage/telegram/telegram.go @@ -103,8 +103,8 @@ func (t *Telegram) Save(ctx context.Context, r io.Reader, storagePath string) er if filename == "" { filename = xid.New().String() + mtype.Extension() } - peer := tctx.PeerStorage.GetInputPeerById(chatID) - if peer == nil { + peer := tryGetInputPeer(tctx, chatID) + if peer == nil || peer.Zero() { return fmt.Errorf("failed to get input peer for chat ID %d", chatID) } diff --git a/storage/telegram/util.go b/storage/telegram/util.go index f2717c1..64521a5 100644 --- a/storage/telegram/util.go +++ b/storage/telegram/util.go @@ -7,6 +7,9 @@ import ( "io" "time" + "github.com/celestix/gotgproto/ext" + "github.com/gotd/td/constant" + "github.com/gotd/td/tg" "github.com/krau/ffmpeg-go" "github.com/yapingcat/gomedia/go-mp4" ) @@ -133,3 +136,28 @@ func extractFrameAt(rs io.ReadSeeker, timestamp float64) ([]byte, error) { return out.Bytes(), nil } + +func tryGetInputPeer(ctx *ext.Context, chatID int64) tg.InputPeerClass { + peer := ctx.PeerStorage.GetInputPeerById(chatID) + if peer != nil && !peer.Zero() { + return peer + } + id := constant.TDLibPeerID(chatID) + plain := id.ToPlain() + var channel constant.TDLibPeerID + channel.Channel(plain) + peer = ctx.PeerStorage.GetInputPeerById(int64(channel)) + if peer != nil && !peer.Zero() { + return peer + } + var chat constant.TDLibPeerID + chat.Chat(plain) + peer = ctx.PeerStorage.GetInputPeerById(int64(chat)) + if peer != nil && !peer.Zero() { + return peer + } + var user constant.TDLibPeerID + user.User(plain) + peer = ctx.PeerStorage.GetInputPeerById(int64(user)) + return peer +}