feat: add IterMessages function for message iteration with error handling
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
|||||||
|
|
||||||
"github.com/celestix/gotgproto/ext"
|
"github.com/celestix/gotgproto/ext"
|
||||||
"github.com/duke-git/lancet/v2/maputil"
|
"github.com/duke-git/lancet/v2/maputil"
|
||||||
|
|
||||||
"github.com/duke-git/lancet/v2/mathutil"
|
"github.com/duke-git/lancet/v2/mathutil"
|
||||||
"github.com/duke-git/lancet/v2/slice"
|
"github.com/duke-git/lancet/v2/slice"
|
||||||
lcstrutil "github.com/duke-git/lancet/v2/strutil"
|
lcstrutil "github.com/duke-git/lancet/v2/strutil"
|
||||||
@@ -159,6 +160,96 @@ func GetMessagesRange(ctx *ext.Context, chatID int64, minId, maxId int) ([]*tg.M
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MessageItem struct {
|
||||||
|
Message *tg.Message
|
||||||
|
Error error
|
||||||
|
}
|
||||||
|
|
||||||
|
func IterMessages(ctx *ext.Context, chatID int64, minId, maxId int) (<-chan MessageItem, error) {
|
||||||
|
total := maxId - minId + 1
|
||||||
|
ch := make(chan MessageItem, 100)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(ch)
|
||||||
|
if !ctx.Self.Bot {
|
||||||
|
perr := ctx.PeerStorage.GetInputPeerById(chatID)
|
||||||
|
if perr == nil || perr.(*tg.InputPeerEmpty) != nil {
|
||||||
|
ch <- MessageItem{
|
||||||
|
Error: fmt.Errorf("peer not found: %d", chatID),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < total; i += 100 {
|
||||||
|
start := minId + i
|
||||||
|
end := min(start+100, maxId)
|
||||||
|
msgs, err := ctx.Raw.MessagesGetHistory(ctx, &tg.MessagesGetHistoryRequest{
|
||||||
|
Peer: perr,
|
||||||
|
OffsetID: start,
|
||||||
|
AddOffset: start - end,
|
||||||
|
Limit: 100,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
ch <- MessageItem{
|
||||||
|
Error: fmt.Errorf("failed to get messages: %w", err),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var msgClass []tg.MessageClass
|
||||||
|
switch msgsv := msgs.(type) {
|
||||||
|
case *tg.MessagesMessages:
|
||||||
|
msgClass = msgsv.GetMessages()
|
||||||
|
case *tg.MessagesMessagesSlice:
|
||||||
|
msgClass = msgsv.GetMessages()
|
||||||
|
case *tg.MessagesChannelMessages:
|
||||||
|
msgClass = msgsv.GetMessages()
|
||||||
|
default:
|
||||||
|
ch <- MessageItem{
|
||||||
|
Error: fmt.Errorf("unsupported message type: %T", msgsv),
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, msg := range msgClass {
|
||||||
|
msg, ok := msg.AsNotEmpty()
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch msg := msg.(type) {
|
||||||
|
case *tg.Message:
|
||||||
|
key := fmt.Sprintf("tgmsg:%d:%d:%d", ctx.Self.ID, chatID, msg.GetID())
|
||||||
|
cache.Set(key, msg)
|
||||||
|
ch <- MessageItem{
|
||||||
|
Message: msg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for i := 0; i < total; i += 100 {
|
||||||
|
start := minId + i
|
||||||
|
end := min(start+100, maxId)
|
||||||
|
msgs, err := GetMessagesRange(ctx, chatID, start, end)
|
||||||
|
if err != nil {
|
||||||
|
ch <- MessageItem{
|
||||||
|
Error: fmt.Errorf("failed to get messages: %w", err),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, msg := range msgs {
|
||||||
|
if msg == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ch <- MessageItem{
|
||||||
|
Message: msg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return ch, nil
|
||||||
|
}
|
||||||
|
|
||||||
func GetMessageByID(ctx *ext.Context, chatID int64, msgID int) (*tg.Message, error) {
|
func GetMessageByID(ctx *ext.Context, chatID int64, msgID int) (*tg.Message, error) {
|
||||||
key := fmt.Sprintf("tgmsg:%d:%d:%d", ctx.Self.ID, chatID, msgID)
|
key := fmt.Sprintf("tgmsg:%d:%d:%d", ctx.Self.ID, chatID, msgID)
|
||||||
if msg, ok := cache.Get[*tg.Message](key); ok {
|
if msg, ok := cache.Get[*tg.Message](key); ok {
|
||||||
|
|||||||
Reference in New Issue
Block a user