From f5e33472eb91390ce22e93b20ecf232729798a06 Mon Sep 17 00:00:00 2001 From: krau <71133316+krau@users.noreply.github.com> Date: Wed, 18 Jun 2025 10:50:54 +0800 Subject: [PATCH] feat: add IterMessages function for message iteration with error handling --- common/utils/tgutil/message.go | 91 ++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/common/utils/tgutil/message.go b/common/utils/tgutil/message.go index 4b7da85..16f5839 100644 --- a/common/utils/tgutil/message.go +++ b/common/utils/tgutil/message.go @@ -7,6 +7,7 @@ import ( "github.com/celestix/gotgproto/ext" "github.com/duke-git/lancet/v2/maputil" + "github.com/duke-git/lancet/v2/mathutil" "github.com/duke-git/lancet/v2/slice" lcstrutil "github.com/duke-git/lancet/v2/strutil" @@ -159,6 +160,96 @@ func GetMessagesRange(ctx *ext.Context, chatID int64, minId, maxId int) ([]*tg.M return result, nil } +type MessageItem struct { + Message *tg.Message + Error error +} + +func IterMessages(ctx *ext.Context, chatID int64, minId, maxId int) (<-chan MessageItem, error) { + total := maxId - minId + 1 + ch := make(chan MessageItem, 100) + + go func() { + defer close(ch) + if !ctx.Self.Bot { + perr := ctx.PeerStorage.GetInputPeerById(chatID) + if perr == nil || perr.(*tg.InputPeerEmpty) != nil { + ch <- MessageItem{ + Error: fmt.Errorf("peer not found: %d", chatID), + } + return + } + + for i := 0; i < total; i += 100 { + start := minId + i + end := min(start+100, maxId) + msgs, err := ctx.Raw.MessagesGetHistory(ctx, &tg.MessagesGetHistoryRequest{ + Peer: perr, + OffsetID: start, + AddOffset: start - end, + Limit: 100, + }) + if err != nil { + ch <- MessageItem{ + Error: fmt.Errorf("failed to get messages: %w", err), + } + return + } + var msgClass []tg.MessageClass + switch msgsv := msgs.(type) { + case *tg.MessagesMessages: + msgClass = msgsv.GetMessages() + case *tg.MessagesMessagesSlice: + msgClass = msgsv.GetMessages() + case *tg.MessagesChannelMessages: + msgClass = msgsv.GetMessages() + default: + ch <- MessageItem{ + Error: fmt.Errorf("unsupported message type: %T", msgsv), + } + continue + } + for _, msg := range msgClass { + msg, ok := msg.AsNotEmpty() + if !ok { + continue + } + switch msg := msg.(type) { + case *tg.Message: + key := fmt.Sprintf("tgmsg:%d:%d:%d", ctx.Self.ID, chatID, msg.GetID()) + cache.Set(key, msg) + ch <- MessageItem{ + Message: msg, + } + } + } + } + } else { + for i := 0; i < total; i += 100 { + start := minId + i + end := min(start+100, maxId) + msgs, err := GetMessagesRange(ctx, chatID, start, end) + if err != nil { + ch <- MessageItem{ + Error: fmt.Errorf("failed to get messages: %w", err), + } + return + } + for _, msg := range msgs { + if msg == nil { + continue + } + ch <- MessageItem{ + Message: msg, + } + } + } + } + }() + + return ch, nil +} + func GetMessageByID(ctx *ext.Context, chatID int64, msgID int) (*tg.Message, error) { key := fmt.Sprintf("tgmsg:%d:%d:%d", ctx.Self.ID, chatID, msgID) if msg, ok := cache.Get[*tg.Message](key); ok {