Compare commits

...

17 Commits

Author SHA1 Message Date
krau
ca0fd67fba ci: add latest tags for micro and pico Docker images 2025-12-18 18:44:18 +08:00
krau
4d736b925b docs: update storage config 2025-12-18 18:44:12 +08:00
krau
ead2b20f4e docs: add docker vriant introduction 2025-12-18 18:43:57 +08:00
krau
080d474714 revert: remove variant handling and simplify binary and asset naming in build-release workflow 2025-12-18 18:31:27 +08:00
krau
f453205fde feat: add Docker build flag and update version handling in update command 2025-12-18 18:12:31 +08:00
krau
407677f270 fix: update binary and asset naming conventions in build-release workflow 2025-12-18 17:59:55 +08:00
krau
958bfd1dbe ci: update build-release workflow to include asset name and additional build flags 2025-12-18 17:55:24 +08:00
krau
debe33d84d ci: add micro and pico Docker image build steps to workflow 2025-12-18 17:46:09 +08:00
krau
52eead3bf5 feat: refactor database dialect handling and add stubs for unsupported features 2025-12-18 17:42:20 +08:00
krau
0af049a507 feat: migrate S3 client implementation to a new package structure 2025-12-18 16:42:58 +08:00
krau
8752dd865c feat: refactor S3 storage implementation and reduce binary size 2025-12-18 16:21:40 +08:00
krau
c0b4580e34 fix: correct split size calculation in Save method 2025-12-16 20:52:17 +08:00
krau
280fd6ead8 fix: update DefaultSplitSize 2025-12-16 20:51:11 +08:00
krau
0ca3d97711 feat: add task command to client and Title method to Task for tasks queue managing, #157 2025-12-15 11:29:55 +08:00
krau
51198a1e3d fix: remove redundant cancellation in Done method 2025-12-15 11:15:17 +08:00
krau
651835c467 feat: refactor queue to remove unused methods and add comments 2025-12-15 10:49:40 +08:00
krau
45c978980c feat: add support for splitting large files into parts for Telegram storage, #156 2025-12-15 10:25:50 +08:00
44 changed files with 1178 additions and 366 deletions

View File

@@ -29,7 +29,7 @@ jobs:
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=sha
type=raw,value=latest,enable={{is_default_branch}}
type=raw,value=latest
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
@@ -55,6 +55,7 @@ jobs:
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile
platforms: linux/amd64,linux/arm64
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
@@ -67,3 +68,47 @@ jobs:
VERSION=${{ steps.meta.outputs.version }}
GitCommit=${{ steps.args.outputs.git_commit }}
BuildTime=${{ steps.args.outputs.build_time }}
- name: Build and push micro Docker image
id: build-and-push-micro
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile.micro
platforms: linux/amd64,linux/arm64
push: ${{ github.event_name != 'pull_request' }}
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:micro
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:micro-latest
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:micro-${{ steps.meta.outputs.version }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: |
type=registry,ref=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
type=gha
cache-to: type=gha,mode=max
build-args: |
VERSION=${{ steps.meta.outputs.version }}
GitCommit=${{ steps.args.outputs.git_commit }}
BuildTime=${{ steps.args.outputs.build_time }}
- name: Build and push pico Docker image
id: build-and-push-pico
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile.pico
platforms: linux/amd64,linux/arm64
push: ${{ github.event_name != 'pull_request' }}
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:pico
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:pico-latest
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:pico-${{ steps.meta.outputs.version }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: |
type=registry,ref=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
type=gha
cache-to: type=gha,mode=max
build-args: |
VERSION=${{ steps.meta.outputs.version }}
GitCommit=${{ steps.args.outputs.git_commit }}
BuildTime=${{ steps.args.outputs.build_time }}

View File

@@ -20,6 +20,7 @@ RUN --mount=type=cache,target=/root/.cache/go-build \
-X 'github.com/krau/SaveAny-Bot/config.Version=${VERSION}' \
-X 'github.com/krau/SaveAny-Bot/config.GitCommit=${GitCommit}' \
-X 'github.com/krau/SaveAny-Bot/config.BuildTime=${BuildTime}' \
-X 'github.com/krau/SaveAny-Bot/config.Docker=true' \
" \
-o saveany-bot .

41
Dockerfile.micro Normal file
View File

@@ -0,0 +1,41 @@
FROM golang:alpine AS builder
ARG VERSION="dev"
ARG GitCommit="Unknown"
ARG BuildTime="Unknown"
WORKDIR /app
COPY go.mod go.sum ./
RUN --mount=type=cache,target=/go/pkg/mod \
go mod download
COPY . .
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/go/pkg \
CGO_ENABLED=0 \
go build -trimpath \
-tags=no_jsparser,no_minio \
-ldflags=" \
-s -w \
-X 'github.com/krau/SaveAny-Bot/config.Version=${VERSION}' \
-X 'github.com/krau/SaveAny-Bot/config.GitCommit=${GitCommit}' \
-X 'github.com/krau/SaveAny-Bot/config.BuildTime=${BuildTime}' \
-X 'github.com/krau/SaveAny-Bot/config.Docker=true' \
" \
-o saveany-bot .
FROM alpine:latest
RUN apk add --no-cache curl
WORKDIR /app
COPY --from=builder /app/saveany-bot .
COPY entrypoint.sh .
RUN chmod +x /app/saveany-bot && \
chmod +x /app/entrypoint.sh
ENTRYPOINT ["/app/entrypoint.sh"]

36
Dockerfile.pico Normal file
View File

@@ -0,0 +1,36 @@
# pico is the minimum build of SaveAnyBot, which disables all the optional features like JS parsing and MinIO support.
FROM golang:alpine AS builder
ARG VERSION="dev"
ARG GitCommit="Unknown"
ARG BuildTime="Unknown"
WORKDIR /app
COPY go.mod go.sum ./
RUN --mount=type=cache,target=/go/pkg/mod \
go mod download
COPY . .
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/go/pkg \
CGO_ENABLED=0 \
go build -trimpath \
-tags=no_jsparser,no_minio,sqlite_glebarez \
-ldflags=" \
-s -w \
-X 'github.com/krau/SaveAny-Bot/config.Version=${VERSION}' \
-X 'github.com/krau/SaveAny-Bot/config.GitCommit=${GitCommit}' \
-X 'github.com/krau/SaveAny-Bot/config.BuildTime=${BuildTime}' \
-X 'github.com/krau/SaveAny-Bot/config.Docker=true' \
" \
-o saveany-bot .
FROM scratch
WORKDIR /app
COPY --from=builder /app/saveany-bot .
RUN chmod +x /app/saveany-bot
ENTRYPOINT ["/app/saveany-bot"]

View File

@@ -14,7 +14,7 @@ import (
"github.com/krau/SaveAny-Bot/client/middleware"
"github.com/krau/SaveAny-Bot/common/utils/tgutil"
"github.com/krau/SaveAny-Bot/config"
"github.com/ncruces/go-sqlite3/gormlite"
"github.com/krau/SaveAny-Bot/database"
)
func Init(ctx context.Context) <-chan struct{} {
@@ -39,7 +39,7 @@ func Init(ctx context.Context) <-chan struct{} {
config.C().Telegram.AppHash,
gotgproto.ClientTypeBot(config.C().Telegram.Token),
&gotgproto.ClientOpts{
Session: sessionMaker.SqlSession(gormlite.Open(config.C().DB.Session)),
Session: sessionMaker.SqlSession(database.GetDialect(config.C().DB.Session)),
DisableCopyright: true,
Middlewares: middleware.NewDefaultMiddlewares(ctx, 5*time.Minute),
Resolver: resolver,

View File

@@ -26,3 +26,20 @@ func handleCancelCallback(ctx *ext.Context, update *ext.Update) error {
return dispatcher.EndGroups
}
func handleCancelCmd(ctx *ext.Context, update *ext.Update) error {
logger := log.FromContext(ctx)
args := strings.Fields(update.EffectiveMessage.Text)
if len(args) < 2 {
ctx.Reply(update, ext.ReplyTextString("用法: /cancel <task_id>"), nil)
return dispatcher.EndGroups
}
taskID := args[1]
if err := core.CancelTask(ctx, taskID); err != nil {
logger.Errorf("failed to cancel task %s: %v", taskID, err)
ctx.Reply(update, ext.ReplyTextString("取消任务失败: "+err.Error()), nil)
return dispatcher.EndGroups
}
ctx.Reply(update, ext.ReplyTextString("已请求取消任务: "+taskID), nil)
return dispatcher.EndGroups
}

View File

@@ -28,6 +28,8 @@ var CommandHandlers = []DescCommandHandler{
{"rule", "管理自动存储规则", handleRuleCmd},
{"save", "保存文件", handleSilentMode(handleSaveCmd, handleSilentSaveReplied)},
{"dl", "下载给定链接的文件", handleDlCmd},
{"task", "管理任务队列", handleTaskCmd},
{"cancel", "取消任务", handleCancelCmd},
{"watch", "监听聊天(UserBot)", handleWatchCmd},
{"unwatch", "取消监听聊天(UserBot)", handleUnwatchCmd},
{"lswatch", "列出监听的聊天(UserBot)", handleLswatchCmd},

View File

@@ -0,0 +1,113 @@
package handlers
import (
"fmt"
"strings"
"time"
"github.com/celestix/gotgproto/dispatcher"
"github.com/celestix/gotgproto/ext"
"github.com/charmbracelet/log"
"github.com/gotd/td/telegram/message/styling"
"github.com/krau/SaveAny-Bot/core"
)
func handleTaskCmd(ctx *ext.Context, update *ext.Update) error {
logger := log.FromContext(ctx)
args := strings.Fields(update.EffectiveMessage.Text)
if len(args) == 1 {
showRunningTasks(ctx, update)
return dispatcher.EndGroups
}
switch args[1] {
case "running", "run", "r":
showRunningTasks(ctx, update)
case "queued", "queue", "q", "waiting":
showQueuedTasks(ctx, update)
case "cancel", "c":
if len(args) < 3 {
ctx.Reply(update, ext.ReplyTextString("用法: /tasks cancel <task_id>"), nil)
return dispatcher.EndGroups
}
taskID := args[2]
if err := core.CancelTask(ctx, taskID); err != nil {
logger.Errorf("取消任务 %s 失败: %v", taskID, err)
ctx.Reply(update, ext.ReplyTextString("取消任务失败: "+err.Error()), nil)
return dispatcher.EndGroups
}
ctx.Reply(update, ext.ReplyTextStyledTextArray([]styling.StyledTextOption{
styling.Plain("已请求取消任务: "),
styling.Code(taskID),
}), nil)
default:
ctx.Reply(update, ext.ReplyTextString("用法: /tasks [running|queued|cancel <task_id>]"), nil)
}
return dispatcher.EndGroups
}
func showRunningTasks(ctx *ext.Context, update *ext.Update) {
tasks := core.GetRunningTasks(ctx)
if len(tasks) == 0 {
ctx.Reply(update, ext.ReplyTextString("当前没有正在运行的任务"), nil)
return
}
opts := make([]styling.StyledTextOption, 0, 2+len(tasks)*4)
opts = append(opts,
styling.Bold("当前正在运行的任务:"),
styling.Plain(fmt.Sprintf("\n总数: %d\n", len(tasks))),
)
for _, t := range tasks {
created := t.Created.In(time.Local).Format("2006-01-02 15:04:05")
status := "运行中"
if t.Cancelled {
status = "已请求取消"
}
opts = append(opts,
styling.Plain("\nID: "),
styling.Code(t.ID),
styling.Plain("\n名称: "),
styling.Code(t.Title),
styling.Plain("\n创建时间: "),
styling.Code(created),
styling.Plain("\n状态: "),
styling.Code(status),
)
}
ctx.Reply(update, ext.ReplyTextStyledTextArray(opts), nil)
}
func showQueuedTasks(ctx *ext.Context, update *ext.Update) {
tasks := core.GetQueuedTasks(ctx)
if len(tasks) == 0 {
ctx.Reply(update, ext.ReplyTextString("当前没有排队中的任务"), nil)
return
}
opts := make([]styling.StyledTextOption, 0, 2+len(tasks)*3)
opts = append(opts,
styling.Bold("当前排队中的任务:"),
styling.Plain(fmt.Sprintf("\n总数: %d\n", len(tasks))),
)
for _, t := range tasks {
created := t.Created.In(time.Local).Format("2006-01-02 15:04:05")
status := "排队中"
if t.Cancelled {
status = "已请求取消"
}
opts = append(opts,
styling.Plain("\nID: "),
styling.Code(t.ID),
styling.Plain("\n名称: "),
styling.Code(t.Title),
styling.Plain("\n创建时间: "),
styling.Code(created),
styling.Plain("\n状态: "),
styling.Code(status),
)
if len(tasks) > 10 {
opts = append(opts, styling.Plain("\n...\n只显示前 10 个任务, 共 "+fmt.Sprintf("%d", len(tasks))+" 个任务"))
break
}
}
ctx.Reply(update, ext.ReplyTextStyledTextArray(opts), nil)
}

View File

@@ -38,6 +38,7 @@ func handleUpdateCmd(ctx *ext.Context, u *ext.Update) error {
ctx.Reply(u, ext.ReplyTextString(fmt.Sprintf("当前已经是最新版本: %s", config.Version)), nil)
return dispatcher.EndGroups
}
indocker := config.Docker == "true"
ctx.Sender.To(u.GetUserChat().AsInputPeer()).StyledText(ctx, html.String(nil, func() string {
md := latest.ReleaseNotes
md = regexp.MustCompile(`(?m)^###\s+&nbsp;&nbsp;&nbsp;(.+)$`).ReplaceAllString(md, "<b>$1</b>")
@@ -53,6 +54,15 @@ func handleUpdateCmd(ctx *ext.Context, u *ext.Update) error {
return `<blockquote expandable>` + md + `</blockquote>`
}()))
if indocker {
text := fmt.Sprintf("发现新版本: %s\n当前版本: %s\n发布时间: %s\n由于您正在使用 Docker 部署, 请自行在部署平台上执行更新命令",
latest.Version,
config.Version,
latest.PublishedAt.Format("2006-01-02 15:04:05"),
)
ctx.Reply(u, ext.ReplyTextString(text), nil)
return dispatcher.EndGroups
}
text := fmt.Sprintf(`发现新版本: %s
当前版本: %s

View File

@@ -17,7 +17,6 @@ import (
"github.com/krau/SaveAny-Bot/common/utils/tgutil"
"github.com/krau/SaveAny-Bot/config"
"github.com/krau/SaveAny-Bot/database"
"github.com/ncruces/go-sqlite3/gormlite"
)
var uc *gotgproto.Client
@@ -64,7 +63,7 @@ func Login(ctx context.Context) (*gotgproto.Client, error) {
config.C().Telegram.AppHash,
gotgproto.ClientTypePhone(""),
&gotgproto.ClientOpts{
Session: sessionMaker.SqlSession(gormlite.Open(config.C().Telegram.Userbot.Session)),
Session: sessionMaker.SqlSession(database.GetDialect(config.C().Telegram.Userbot.Session)),
AuthConversator: &terminalAuthConversator{},
Context: ctx,
DisableCopyright: true,

View File

@@ -1,3 +1,5 @@
// [TODO] complete the i18n support
package i18n
import (

View File

@@ -12,6 +12,11 @@ type TelegramStorageConfig struct {
ForceFile bool `toml:"force_file" mapstructure:"force_file" json:"force_file"`
RateLimit int `toml:"rate_limit" mapstructure:"rate_limit" json:"rate_limit"`
RateBurst int `toml:"rate_burst" mapstructure:"rate_burst" json:"rate_burst"`
SkipLarge bool `toml:"skip_large" mapstructure:"skip_large" json:"skip_large"` // skip files larger than Telegram limit(2GB)
// split files larger than Telegram limit(2GB) into parts of specified size, in MB, leave 0 to set default(2000MB)
// only effective when SkipLarge is false
// use zip when splitting
SplitSizeMB int64 `toml:"split_size_mb" mapstructure:"split_size_mb" json:"split_size_mb"`
}
func (m *TelegramStorageConfig) Validate() error {

View File

@@ -6,8 +6,9 @@ var (
Version string = "dev"
BuildTime string = "unknown"
GitCommit string = "unknown"
Docker string = "false" // whether built inside Docker
)
const (
GitRepo = "krau/SaveAny-Bot"
)
)

View File

@@ -10,15 +10,16 @@ import (
"github.com/krau/SaveAny-Bot/pkg/queue"
)
var queueInstance *queue.TaskQueue[Exectable]
var queueInstance *queue.TaskQueue[Executable]
type Exectable interface {
type Executable interface {
Type() tasktype.TaskType
Title() string
TaskID() string
Execute(ctx context.Context) error
}
func worker(ctx context.Context, qe *queue.TaskQueue[Exectable], semaphore chan struct{}) {
func worker(ctx context.Context, qe *queue.TaskQueue[Executable], semaphore chan struct{}) {
logger := log.FromContext(ctx)
execHooks := config.C().Hook.Exec
for {
@@ -28,27 +29,27 @@ func worker(ctx context.Context, qe *queue.TaskQueue[Exectable], semaphore chan
logger.Error("Failed to get task from queue:", err)
break // queue closed and empty
}
task := qtask.Data
logger.Infof("Processing task: %s", task.TaskID())
exe := qtask.Data
logger.Infof("Processing task: %s", exe.TaskID())
if err := ExecCommandString(qtask.Context(), execHooks.TaskBeforeStart); err != nil {
logger.Errorf("Failed to execute before start hook for task %s: %v", task.TaskID(), err)
logger.Errorf("Failed to execute before start hook for task %s: %v", exe.TaskID(), err)
}
if err := task.Execute(qtask.Context()); err != nil {
if err := exe.Execute(qtask.Context()); err != nil {
if errors.Is(err, context.Canceled) {
logger.Infof("Task %s was canceled", task.TaskID())
logger.Infof("Task %s was canceled", exe.TaskID())
if err := ExecCommandString(ctx, execHooks.TaskCancel); err != nil {
logger.Errorf("Failed to execute cancel hook for task %s: %v", task.TaskID(), err)
logger.Errorf("Failed to execute cancel hook for task %s: %v", exe.TaskID(), err)
}
} else {
logger.Errorf("Failed to execute task %s: %v", task.TaskID(), err)
logger.Errorf("Failed to execute task %s: %v", exe.TaskID(), err)
if err := ExecCommandString(ctx, execHooks.TaskFail); err != nil {
logger.Errorf("Failed to execute fail hook for task %s: %v", task.TaskID(), err)
logger.Errorf("Failed to execute fail hook for task %s: %v", exe.TaskID(), err)
}
}
} else {
logger.Infof("Task %s completed successfully", task.TaskID())
logger.Infof("Task %s completed successfully", exe.TaskID())
if err := ExecCommandString(ctx, execHooks.TaskSuccess); err != nil {
logger.Errorf("Failed to execute success hook for task %s: %v", task.TaskID(), err)
logger.Errorf("Failed to execute success hook for task %s: %v", exe.TaskID(), err)
}
}
qe.Done(qtask.ID)
@@ -60,7 +61,7 @@ func Run(ctx context.Context) {
log.FromContext(ctx).Info("Start processing tasks...")
semaphore := make(chan struct{}, config.C().Workers)
if queueInstance == nil {
queueInstance = queue.NewTaskQueue[Exectable]()
queueInstance = queue.NewTaskQueue[Executable]()
}
for range config.C().Workers {
go worker(ctx, queueInstance, semaphore)
@@ -68,8 +69,8 @@ func Run(ctx context.Context) {
}
func AddTask(ctx context.Context, task Exectable) error {
return queueInstance.Add(queue.NewTask(ctx, task.TaskID(), task))
func AddTask(ctx context.Context, task Executable) error {
return queueInstance.Add(queue.NewTask(ctx, task.TaskID(), task.Title(), task))
}
func CancelTask(ctx context.Context, id string) error {
@@ -78,8 +79,13 @@ func CancelTask(ctx context.Context, id string) error {
}
func GetLength(ctx context.Context) int {
if queueInstance == nil {
return 0
}
return queueInstance.ActiveLength()
}
func GetRunningTasks(ctx context.Context) []queue.TaskInfo {
return queueInstance.RunningTasks()
}
func GetQueuedTasks(ctx context.Context) []queue.TaskInfo {
return queueInstance.QueuedTasks()
}

View File

@@ -8,12 +8,15 @@ import (
"sync/atomic"
"github.com/krau/SaveAny-Bot/config"
"github.com/krau/SaveAny-Bot/core"
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
"github.com/krau/SaveAny-Bot/pkg/tfile"
"github.com/krau/SaveAny-Bot/storage"
"github.com/rs/xid"
)
var _ core.Executable = (*Task)(nil)
type TaskElement struct {
ID string
Storage storage.Storage
@@ -36,6 +39,11 @@ type Task struct {
failed map[string]error // [TODO] errors for each element
}
// Title implements core.Exectable.
func (t *Task) Title() string {
return fmt.Sprintf("[%s](%d files/%.2fMB)", t.Type(), len(t.elems), float64(t.totalSize)/(1024*1024))
}
func (t *Task) Type() tasktype.TaskType {
return tasktype.TaskTypeTgfiles
}

View File

@@ -2,11 +2,13 @@ package directlinks
import (
"context"
"fmt"
"net/http"
"sync"
"sync/atomic"
"github.com/krau/SaveAny-Bot/config"
"github.com/krau/SaveAny-Bot/core"
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
"github.com/krau/SaveAny-Bot/storage"
)
@@ -25,6 +27,8 @@ func (f *File) FileSize() int64 {
return f.Size
}
var _ core.Executable = (*Task)(nil)
type Task struct {
ID string
ctx context.Context
@@ -44,6 +48,11 @@ type Task struct {
failed map[string]error // [TODO] errors for each file
}
// Title implements core.Exectable.
func (t *Task) Title() string {
return fmt.Sprintf("[%s](%s...->%s:%s)", t.Type(), t.files[0].Name, t.Storage.Name(), t.StorPath)
}
// DownloadedBytes implements TaskInfo.
func (t *Task) DownloadedBytes() int64 {
return t.downloadedBytes.Load()

View File

@@ -2,17 +2,21 @@ package parsed
import (
"context"
"fmt"
"net/http"
"sync"
"sync/atomic"
"github.com/krau/SaveAny-Bot/common/utils/netutil"
"github.com/krau/SaveAny-Bot/config"
"github.com/krau/SaveAny-Bot/core"
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
"github.com/krau/SaveAny-Bot/pkg/parser"
"github.com/krau/SaveAny-Bot/storage"
)
var _ core.Executable = (*Task)(nil)
type Task struct {
ID string
Ctx context.Context
@@ -20,8 +24,8 @@ type Task struct {
StorPath string
item *parser.Item
httpClient *http.Client // [TODO] btorrent support?
progress ProgressTracker
stream bool
progress ProgressTracker
stream bool
totalResources int64
downloaded atomic.Int64 // downloaded resources count
@@ -32,6 +36,11 @@ type Task struct {
failed map[string]error // [TODO] errors for each resource
}
// Title implements core.Exectable.
func (t *Task) Title() string {
return fmt.Sprintf("[%s](%s->%s:%s)", t.Type(), t.item.Title, t.Stor.Name(), t.StorPath)
}
func (t *Task) Type() tasktype.TaskType {
return tasktype.TaskTypeParseditem
}

View File

@@ -2,13 +2,17 @@ package telegraph
import (
"context"
"fmt"
"sync/atomic"
"github.com/krau/SaveAny-Bot/core"
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
"github.com/krau/SaveAny-Bot/pkg/telegraph"
"github.com/krau/SaveAny-Bot/storage"
)
var _ core.Executable = (*Task)(nil)
type Task struct {
ID string
Ctx context.Context
@@ -24,6 +28,11 @@ type Task struct {
downloaded atomic.Int64
}
// Title implements core.Exectable.
func (t *Task) Title() string {
return fmt.Sprintf("[%s](%s->%s:%s)", t.Type(), t.PhPath, t.Stor.Name(), t.StorPath)
}
func (t *Task) Type() tasktype.TaskType {
return tasktype.TaskTypeTphpics
}

View File

@@ -6,11 +6,14 @@ import (
"path/filepath"
"github.com/krau/SaveAny-Bot/config"
"github.com/krau/SaveAny-Bot/core"
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
"github.com/krau/SaveAny-Bot/pkg/tfile"
"github.com/krau/SaveAny-Bot/storage"
)
var _ core.Executable = (*Task)(nil)
type Task struct {
ID string
Ctx context.Context
@@ -22,6 +25,11 @@ type Task struct {
localPath string
}
// Title implements core.Exectable.
func (t *Task) Title() string {
return fmt.Sprintf("[%s](%s->%s:%s)", t.Type(), t.File.Name(), t.Storage.Name(), t.Path)
}
func (t *Task) Type() tasktype.TaskType {
return tasktype.TaskTypeTgfiles
}

View File

@@ -9,8 +9,6 @@ import (
"github.com/charmbracelet/log"
"github.com/krau/SaveAny-Bot/config"
_ "github.com/ncruces/go-sqlite3/embed"
"github.com/ncruces/go-sqlite3/gormlite"
"gorm.io/gorm"
glogger "gorm.io/gorm/logger"
)
@@ -23,7 +21,7 @@ func Init(ctx context.Context) {
logger.Fatal("Failed to create data directory: ", err)
}
var err error
db, err = gorm.Open(gormlite.Open(config.C().DB.Path), &gorm.Config{
db, err = gorm.Open(GetDialect(config.C().DB.Path), &gorm.Config{
Logger: glogger.New(logger, glogger.Config{
Colorful: true,
SlowThreshold: time.Second * 5,

13
database/driver.go Normal file
View File

@@ -0,0 +1,13 @@
//go:build !sqlite_glebarez
package database
import (
_ "github.com/ncruces/go-sqlite3/embed"
"github.com/ncruces/go-sqlite3/gormlite"
"gorm.io/gorm"
)
func GetDialect(dsn string) gorm.Dialector {
return gormlite.Open(dsn)
}

View File

@@ -0,0 +1,12 @@
//go:build sqlite_glebarez
package database
import (
"github.com/glebarez/sqlite"
"gorm.io/gorm"
)
func GetDialect(dsn string) gorm.Dialector {
return sqlite.Open(dsn)
}

View File

@@ -44,6 +44,15 @@ Stream 模式对于磁盘空间有限的部署环境十分有用, 但也有一
- `workers`: 同时处理任务数量, 默认为 3
- `threads`: 下载文件时使用的线程数, 默认为 4. 仅在未启用 Stream 模式时生效.
- `retry`: 任务失败时的重试次数, 默认为 3.
- `proxy`: 全局代理配置, 配置后程序内一切网络连接将会尝试使用该代理, 可选.
```toml
stream = false
workers = 3
threads = 4
retry = 3
proxy = "socks5://127.0.0.1:7890"
```
### Telegram 配置

View File

@@ -46,15 +46,29 @@ base_path = "/path/to/webdav" # WebDAV 中的基础路径, 所有文件将存储
`type=s3`
```toml
endpoint = "s3.example.com" # S3 的端点
endpoint = "s3.example.com" # S3 的端点, 默认为 aws S3 的端点
region = "us-east-1" # S3 的区域
access_key_id = "your_access_key_id" # S3 的访问密钥 ID
secret_access_key = "your_secret_access_key" # S3 的秘密访问密钥
bucket_name = "your_bucket_name" # S3 的存储桶名称
use_ssl = true # 是否使用 SSL, 默认为 true
base_path = "/path/to/s3" # S3 中的基础路径, 所有文件将存储在此路径下
virtual_host = false # 使用虚拟主机风格的 URL, 默认为 false
```
虚拟主机风格的 URL 示例:
```
https://your_bucket_name.s3.example.com/path/to/s3/your_file
```
路径风格(关闭 virtual_host)的 URL 示例:
```
https://s3.example.com/your_bucket_name/path/to/s3/your_file
```
如果你使用的是第三方的兼容 S3 的服务, 一般使用的是路径风格的 URL. 而 AWS S3 则通常使用虚拟主机风格的 URL. 详情请参考你所使用的 S3 兼容服务的文档.
## Telegram
`type=telegram`
@@ -63,4 +77,7 @@ base_path = "/path/to/s3" # S3 中的基础路径, 所有文件将存储在此
```toml
chat_id = "123456789" # Telegram 聊天 ID, Bot 将把文件发送到这个聊天
force_file = false # 是否强制使用文件方式发送, 默认为 false.
skip_large = true # 是否跳过大文件, 默认为 true. 如果启用, 超过 Telegram 限制的文件将不会上传.
spilt_size_mb = 2000 # 分卷大小, 单位 MB, 默认为 2000 MB (2 GB). 超过该大小的文件将被分割成多个部分上传.(使用 zip 格式)
```

View File

@@ -129,17 +129,39 @@ docker run -d --name saveany-bot \
ghcr.io/krau/saveany-bot:latest
```
{{< hint info >}}
关于 docker 镜像的变体版本
<br />
<ul>
<li>默认版本: 包含所有功能和依赖, 体积较大. 如果没有特殊需要, 请使用此版本</li>
<li>micro: 精简版本, 去除部分可选依赖, 体积较小</li>
<li>pico: 极简版本, 仅包含核心功能, 体积最小</li>
</ul>
你可以根据需要, 通过指定不同的标签来拉取合适的版本, 例如: <code>ghcr.io/krau/saveany-bot:micro</code>
<br />
关于变体版本的更详细的区别, 请参考项目根目录下的 Dockerfile 文件.
{{< /hint >}}
## 更新
向 Bot 发送 `/update` 指令检查更新并升级, 或者使用 CLI 命令更新:
若使用预编译二进制文件部署, 使用以下 CLI 命令更新:
```bash
./saveany-bot up
```
如果是 Docker 部署, 还可以使用以下命令更新:
如果是 Docker 部署, 使用以下命令更新:
docker:
```bash
docker pull ghcr.io/krau/saveany-bot:latest
docker restart saveany-bot
```
docker compose:
```bash
docker compose pull
docker compose restart
```

20
go.mod
View File

@@ -3,10 +3,6 @@ module github.com/krau/SaveAny-Bot
go 1.24.0
require (
github.com/aws/aws-sdk-go-v2 v1.40.1
github.com/aws/aws-sdk-go-v2/config v1.32.3
github.com/aws/aws-sdk-go-v2/credentials v1.19.3
github.com/aws/aws-sdk-go-v2/service/s3 v1.93.0
github.com/blang/semver v3.5.1+incompatible
github.com/celestix/gotgproto v1.0.0-beta22
github.com/cenkalti/backoff/v4 v4.3.0
@@ -31,20 +27,6 @@ require (
require (
github.com/AnimeKaizoku/cacher v1.0.3 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.15 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.6 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.11 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.41.3 // indirect
github.com/aws/smithy-go v1.24.0 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
@@ -125,7 +107,7 @@ require (
github.com/dop251/goja v0.0.0-20251008123653-cf18d89f3cf6
github.com/duke-git/lancet/v2 v2.3.7
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/glebarez/sqlite v1.11.0 // indirect
github.com/glebarez/sqlite v1.11.0
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.18.2 // indirect
github.com/mitchellh/mapstructure v1.5.0

58
go.sum
View File

@@ -4,44 +4,30 @@ github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg
github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
github.com/aws/aws-sdk-go-v2 v1.40.1 h1:difXb4maDZkRH0x//Qkwcfpdg1XQVXEAEs2DdXldFFc=
github.com/aws/aws-sdk-go-v2 v1.40.1/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 h1:489krEF9xIGkOaaX3CE/Be2uWjiXrkCH6gUX+bZA/BU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4/go.mod h1:IOAPF6oT9KCsceNTvvYMNHy0+kMF8akOjeDvPENWxp4=
github.com/aws/aws-sdk-go-v2/config v1.32.3 h1:cpz7H2uMNTDa0h/5CYL5dLUEzPSLo2g0NkbxTRJtSSU=
github.com/aws/aws-sdk-go-v2/config v1.32.3/go.mod h1:srtPKaJJe3McW6T/+GMBZyIPc+SeqJsNPJsd4mOYZ6s=
github.com/aws/aws-sdk-go-v2/credentials v1.19.3 h1:01Ym72hK43hjwDeJUfi1l2oYLXBAOR8gNSZNmXmvuas=
github.com/aws/aws-sdk-go-v2/credentials v1.19.3/go.mod h1:55nWF/Sr9Zvls0bGnWkRxUdhzKqj9uRNlPvgV1vgxKc=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.15 h1:utxLraaifrSBkeyII9mIbVwXXWrZdlPO7FIKmyLCEcY=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.15/go.mod h1:hW6zjYUDQwfz3icf4g2O41PHi77u10oAzJ84iSzR/lo=
github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM=
github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14=
github.com/aws/aws-sdk-go-v2/credentials v1.17.67 h1:9KxtdcIA/5xPNQyZRgUSpYOE6j9Bc4+D7nZua0KGYOM=
github.com/aws/aws-sdk-go-v2/credentials v1.17.67/go.mod h1:p3C44m+cfnbv763s52gCqrjaqyPikj9Sg47kUVaNZQQ=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.75 h1:S61/E3N01oral6B3y9hZ2E1iFDqCZPPOBoBQretCnBI=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.75/go.mod h1:bDMQbkI1vJbNjnvJYpPTSNYBkI/VIv18ngWb/K84tkk=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.15 h1:Y5YXgygXwDI5P4RkteB5yF7v35neH7LfJKBG+hzIons=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.15/go.mod h1:K+/1EpG42dFSY7CBj+Fruzm8PsCGWTXJ3jdeJ659oGQ=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.15 h1:AvltKnW9ewxX2hFmQS0FyJH93aSvJVUEFvXfU+HWtSE=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.15/go.mod h1:3I4oCdZdmgrREhU74qS1dK9yZ62yumob+58AbFR4cQA=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.15 h1:NLYTEyZmVZo0Qh183sC8nC+ydJXOOeIL/qI/sS3PdLY=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.15/go.mod h1:Z803iB3B0bc8oJV8zH2PERLRfQUJ2n2BXISpsA4+O1M=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 h1:0ryTNEdJbzUCEWkVXEXoqlXV72J5keC1GvILMOuD00E=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4/go.mod h1:HQ4qwNZh32C3CBeO6iJLQlgtMzqeG17ziAA/3KDJFow=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.6 h1:P1MU/SuhadGvg2jtviDXPEejU3jBNhoeeAlRadHzvHI=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.6/go.mod h1:5KYaMG6wmVKMFBSfWoyG/zH8pWwzQFnKgpoSRlXHKdQ=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.15 h1:3/u/4yZOffg5jdNk1sDpOQ4Y+R6Xbh+GzpDrSZjuy3U=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.15/go.mod h1:4Zkjq0FKjE78NKjabuM4tRXKFzUJWXgP0ItEZK8l7JU=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.15 h1:wsSQ4SVz5YE1crz0Ap7VBZrV4nNqZt4CIBBT8mnwoNc=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.15/go.mod h1:I7sditnFGtYMIqPRU1QoHZAUrXkGp4SczmlLwrNPlD0=
github.com/aws/aws-sdk-go-v2/service/s3 v1.93.0 h1:IrbE3B8O9pm3lsg96AXIN5MXX4pECEuExh/A0Du3AuI=
github.com/aws/aws-sdk-go-v2/service/s3 v1.93.0/go.mod h1:/sJLzHtiiZvs6C1RbxS/anSAFwZD6oC6M/kotQzOiLw=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.3 h1:d/6xOGIllc/XW1lzG9a4AUBMmpLA9PXcQnVPTuHHcik=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.3/go.mod h1:fQ7E7Qj9GiW8y0ClD7cUJk3Bz5Iw8wZkWDHsTe8vDKs=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.6 h1:8sTTiw+9yuNXcfWeqKF2x01GqCF49CpP4Z9nKrrk/ts=
github.com/aws/aws-sdk-go-v2/service/sso v1.30.6/go.mod h1:8WYg+Y40Sn3X2hioaaWAAIngndR8n1XFdRPPX+7QBaM=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.11 h1:E+KqWoVsSrj1tJ6I/fjDIu5xoS2Zacuu1zT+H7KtiIk=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.11/go.mod h1:qyWHz+4lvkXcr3+PoGlGHEI+3DLLiU6/GdrFfMaAhB0=
github.com/aws/aws-sdk-go-v2/service/sts v1.41.3 h1:tzMkjh0yTChUqJDgGkcDdxvZDSrJ/WB6R6ymI5ehqJI=
github.com/aws/aws-sdk-go-v2/service/sts v1.41.3/go.mod h1:T270C0R5sZNLbWUe8ueiAF42XSZxxPocTaGSgs5c/60=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34 h1:ZNTqv4nIdE/DiBfUUfXcLZ/Spcuz+RjeziUtNJackkM=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.34/go.mod h1:zf7Vcd1ViW7cPqYWEHLHJkS50X0JS2IKz9Cgaj6ugrs=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1 h1:4nm2G6A4pV9rdlWzGMPv4BNtQp22v1hg3yrtkYpeLl8=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.1/go.mod h1:iu6FSzgt+M2/x3Dk8zhycdIcHjEFb36IS8HVUVFoMg0=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 h1:moLQUoVq91LiqT1nbvzDukyqAlCv89ZmwaHw/ZFlFZg=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15/go.mod h1:ZH34PJUc8ApjBIfgQCFvkWcUDBtl/WTD+uiYHjd8igA=
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3 h1:BRXS0U76Z8wfF+bnkilA2QwpIch6URlm++yPUt9QPmQ=
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3/go.mod h1:bNXKFFyaiVvWuR6O16h/I1724+aXe/tAkA9/QS01t5k=
github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk=
github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0=
github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k=

View File

@@ -1,3 +1,5 @@
//go:build !no_jsparser
package js
import (

View File

@@ -1,3 +1,5 @@
//go:build !no_jsparser && !no_playwright
package js
import (

View File

@@ -1,4 +1,4 @@
//go:build no_playwright
//go:build no_playwright && !no_jsparser
package js

View File

@@ -1,3 +1,5 @@
//go:build !no_jsparser
package js
import (

16
parsers/js/js_stub.go Normal file
View File

@@ -0,0 +1,16 @@
//go:build no_jsparser
package js
import (
"context"
"errors"
)
func LoadPlugins(ctx context.Context, dir string) error {
return errors.New("JS parser plugins are not supported in this build")
}
func AddPlugin(ctx context.Context, code string, name string) error {
return errors.New("JS parser plugins are not supported in this build")
}

View File

@@ -1,3 +1,5 @@
//go:build !no_jsparser
package js
import "github.com/blang/semver"

View File

@@ -38,7 +38,7 @@ func (tq *TaskQueue[T]) Add(task *Task[T]) error {
return fmt.Errorf("task with ID %s already exists", task.ID)
}
if task.IsCancelled() {
if task.Cancelled() {
return fmt.Errorf("task %s has been cancelled", task.ID)
}
@@ -50,6 +50,8 @@ func (tq *TaskQueue[T]) Add(task *Task[T]) error {
return nil
}
// Get retrieves and removes the next non-cancelled task from the queue, adding it to the running tasks.
// Blocks until a task is available or the queue is closed.
func (tq *TaskQueue[T]) Get() (*Task[T], error) {
tq.mu.Lock()
defer tq.mu.Unlock()
@@ -69,7 +71,7 @@ func (tq *TaskQueue[T]) Get() (*Task[T], error) {
tq.tasks.Remove(element)
task.element = nil
if !task.IsCancelled() {
if !task.Cancelled() {
tq.runningTaskMap[task.ID] = task
return task, nil
}
@@ -82,38 +84,21 @@ func (tq *TaskQueue[T]) Get() (*Task[T], error) {
return nil, fmt.Errorf("queue is closed and empty")
}
// Done stops(cancels) and removes the task from the running tasks.
func (tq *TaskQueue[T]) Done(taskID string) {
tq.mu.Lock()
defer tq.mu.Unlock()
delete(tq.taskMap, taskID)
delete(tq.runningTaskMap, taskID)
}
func (tq *TaskQueue[T]) Peek() (*Task[T], error) {
tq.mu.RLock()
defer tq.mu.RUnlock()
if tq.tasks.Len() == 0 {
return nil, fmt.Errorf("queue is empty")
}
for element := tq.tasks.Front(); element != nil; element = element.Next() {
task := element.Value.(*Task[T])
if !task.IsCancelled() {
return task, nil
}
}
return nil, fmt.Errorf("queue has no valid tasks")
}
func (tq *TaskQueue[T]) Length() int {
tq.mu.RLock()
defer tq.mu.RUnlock()
return tq.tasks.Len()
}
// ActiveLength returns the number of non-cancelled tasks in the queue.
func (tq *TaskQueue[T]) ActiveLength() int {
tq.mu.RLock()
defer tq.mu.RUnlock()
@@ -121,13 +106,58 @@ func (tq *TaskQueue[T]) ActiveLength() int {
count := 0
for element := tq.tasks.Front(); element != nil; element = element.Next() {
task := element.Value.(*Task[T])
if !task.IsCancelled() {
if !task.Cancelled() {
count++
}
}
return count
}
// RunningTasks returns the currently running tasks' info.
func (tq *TaskQueue[T]) RunningTasks() []TaskInfo {
tq.mu.RLock()
defer tq.mu.RUnlock()
tasks := make([]TaskInfo, 0, len(tq.runningTaskMap))
for _, task := range tq.runningTaskMap {
if task.Cancelled() {
continue
}
tasks = append(tasks, TaskInfo{
ID: task.ID,
Title: task.Title,
Created: task.created,
Cancelled: task.Cancelled(),
})
}
return tasks
}
// QueuedTasks returns the queued (not yet running) tasks' info.
// The sorting is in the order of addition.
func (tq *TaskQueue[T]) QueuedTasks() []TaskInfo {
tq.mu.RLock()
defer tq.mu.RUnlock()
tasks := make([]TaskInfo, 0, tq.tasks.Len())
for element := tq.tasks.Front(); element != nil; element = element.Next() {
task := element.Value.(*Task[T])
if !task.Cancelled() {
tasks = append(tasks, TaskInfo{
ID: task.ID,
Title: task.Title,
Created: task.created,
Cancelled: task.Cancelled(),
})
}
}
return tasks
}
// CancelTask cancels a task by its ID.
// It looks for the task in both queued and running tasks.
// [NOTE] Cancelled tasks will not be removed from the queue, but marked as cancelled. Use Done to remove them.
// [WARN] Cancelling a running task relies on the task's implementation to respect the cancellation. If the task does not check for cancellation, it may continue running.
func (tq *TaskQueue[T]) CancelTask(taskID string) error {
tq.mu.RLock()
task, exists := tq.taskMap[taskID]
@@ -144,52 +174,6 @@ func (tq *TaskQueue[T]) CancelTask(taskID string) error {
return nil
}
func (tq *TaskQueue[T]) RemoveTask(taskID string) error {
tq.mu.Lock()
defer tq.mu.Unlock()
task, exists := tq.taskMap[taskID]
if !exists {
_, exists = tq.runningTaskMap[taskID]
if exists {
delete(tq.runningTaskMap, taskID)
}
return fmt.Errorf("task %s is already running, cannot remove from queue", taskID)
}
if task.element != nil {
tq.tasks.Remove(task.element)
}
delete(tq.taskMap, taskID)
task.Cancel()
return nil
}
func (tq *TaskQueue[T]) CancelAll() {
tq.mu.RLock()
tasks := make([]*Task[T], 0, tq.tasks.Len())
for element := tq.tasks.Front(); element != nil; element = element.Next() {
tasks = append(tasks, element.Value.(*Task[T]))
}
tq.mu.RUnlock()
for _, task := range tasks {
task.Cancel()
}
}
func (tq *TaskQueue[T]) GetTask(taskID string) (*Task[T], error) {
tq.mu.RLock()
defer tq.mu.RUnlock()
task, exists := tq.taskMap[taskID]
if !exists {
return nil, fmt.Errorf("task %s does not exist", taskID)
}
return task, nil
}
func (tq *TaskQueue[T]) Close() {
tq.mu.Lock()
defer tq.mu.Unlock()
@@ -197,45 +181,3 @@ func (tq *TaskQueue[T]) Close() {
tq.closed = true
tq.cond.Broadcast()
}
func (tq *TaskQueue[T]) IsClosed() bool {
tq.mu.RLock()
defer tq.mu.RUnlock()
return tq.closed
}
func (tq *TaskQueue[T]) Clear() {
tq.mu.Lock()
defer tq.mu.Unlock()
for element := tq.tasks.Front(); element != nil; element = element.Next() {
task := element.Value.(*Task[T])
task.Cancel()
}
tq.tasks.Init()
tq.taskMap = make(map[string]*Task[T])
}
func (tq *TaskQueue[T]) CleanupCancelled() int {
tq.mu.Lock()
defer tq.mu.Unlock()
removed := 0
element := tq.tasks.Front()
for element != nil {
next := element.Next()
task := element.Value.(*Task[T])
if task.IsCancelled() {
tq.tasks.Remove(element)
delete(tq.taskMap, task.ID)
removed++
}
element = next
}
return removed
}

View File

@@ -11,7 +11,7 @@ import (
// helper to create a simple Task with integer payload
func newTask(id string) *queue.Task[int] {
return queue.NewTask(context.Background(), id, 0)
return queue.NewTask(context.Background(), id, "testing", 0)
}
func TestAddAndLength(t *testing.T) {
@@ -39,37 +39,6 @@ func TestDuplicateAdd(t *testing.T) {
}
}
func TestGetAndPeek(t *testing.T) {
q := queue.NewTaskQueue[int]()
t1 := newTask("a")
t2 := newTask("b")
q.Add(t1)
q.Add(t2)
// Peek should return t1
peeked, err := q.Peek()
if err != nil {
t.Fatalf("unexpected error on Peek: %v", err)
}
if peeked.ID != "a" {
t.Fatalf("expected Peek ID 'a', got '%s'", peeked.ID)
}
// Get should return t1 then t2
first, err := q.Get()
if err != nil {
t.Fatalf("unexpected error on Get: %v", err)
}
if first.ID != "a" {
t.Fatalf("expected first Get ID 'a', got '%s'", first.ID)
}
second, err := q.Get()
if err != nil {
t.Fatalf("unexpected error on second Get: %v", err)
}
if second.ID != "b" {
t.Fatalf("expected second Get ID 'b', got '%s'", second.ID)
}
}
func TestCancelAndActiveLength(t *testing.T) {
q := queue.NewTaskQueue[int]()
t1 := newTask("1")
@@ -90,41 +59,6 @@ func TestCancelAndActiveLength(t *testing.T) {
}
}
func TestRemoveTask(t *testing.T) {
q := queue.NewTaskQueue[int]()
t1 := newTask("r1")
q.Add(t1)
if err := q.RemoveTask("r1"); err != nil {
t.Fatalf("unexpected error on RemoveTask: %v", err)
}
if q.Length() != 0 {
t.Fatalf("expected length 0 after remove, got %d", q.Length())
}
}
func TestClearAndCleanupCancelled(t *testing.T) {
q := queue.NewTaskQueue[int]()
tasks := []*queue.Task[int]{newTask("c1"), newTask("c2"), newTask("c3")}
for _, tsk := range tasks {
q.Add(tsk)
}
// Cancel one
q.CancelTask("c2")
// Cleanup cancelled
removed := q.CleanupCancelled()
if removed != 1 {
t.Fatalf("expected removed 1, got %d", removed)
}
if q.ActiveLength() != 2 {
t.Fatalf("expected active length 2 after cleanup, got %d", q.ActiveLength())
}
// Clear all
q.Clear()
if q.Length() != 0 {
t.Fatalf("expected length 0 after clear, got %d", q.Length())
}
}
func TestCloseBehavior(t *testing.T) {
q := queue.NewTaskQueue[int]()
done := make(chan struct{})

View File

@@ -8,6 +8,7 @@ import (
type Task[T any] struct {
ID string
Title string
Data T
ctx context.Context
cancel context.CancelFunc
@@ -15,10 +16,19 @@ type Task[T any] struct {
element *list.Element
}
func NewTask[T any](ctx context.Context, id string, data T) *Task[T] {
// Read-only info about a task
type TaskInfo struct {
ID string
Created time.Time
Cancelled bool
Title string
}
func NewTask[T any](ctx context.Context, id string, title string, data T) *Task[T] {
cancelCtx, cancel := context.WithCancel(ctx)
return &Task[T]{
ID: id,
Title: title,
Data: data,
ctx: cancelCtx,
cancel: cancel,
@@ -26,7 +36,7 @@ func NewTask[T any](ctx context.Context, id string, data T) *Task[T] {
}
}
func (t *Task[T]) IsCancelled() bool {
func (t *Task[T]) Cancelled() bool {
select {
case <-t.ctx.Done():
return true

221
pkg/s3/client.go Normal file
View File

@@ -0,0 +1,221 @@
package s3
import (
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"sort"
"strings"
"time"
)
type Client struct {
endpoint string
region string
bucket string
accessKey string
secretKey string
httpClient *http.Client
pathStyle bool
}
type Config struct {
Endpoint string
Region string
BucketName string
AccessKeyID string
SecretAccessKey string
PathStyle bool
HttpClient *http.Client
}
func (c *Config) ApplyDefaults() {
if c.HttpClient == nil {
c.HttpClient = http.DefaultClient
}
if c.Endpoint == "" {
switch c.Region {
case "us-east-1", "":
c.Endpoint = "https://s3.amazonaws.com"
default:
c.Endpoint = fmt.Sprintf("https://s3.%s.amazonaws.com", c.Region)
}
}
}
func NewClient(cfg *Config) (*Client, error) {
cfg.ApplyDefaults()
return &Client{
endpoint: cfg.Endpoint,
region: cfg.Region,
bucket: cfg.BucketName,
accessKey: cfg.AccessKeyID,
secretKey: cfg.SecretAccessKey,
httpClient: cfg.HttpClient,
pathStyle: cfg.PathStyle,
}, nil
}
func (c *Client) HeadBucket(ctx context.Context) error {
url, err := c.buildURL("")
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, "HEAD", url, nil)
if err != nil {
return err
}
if err := signRequest(req, c.region, c.accessKey, c.secretKey, hashSHA256(nil)); err != nil {
return err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("head bucket failed: %s", resp.Status)
}
return nil
}
func (c *Client) Exists(ctx context.Context, key string) bool {
url, err := c.buildURL(key)
if err != nil {
return false
}
req, err := http.NewRequestWithContext(ctx, "HEAD", url, nil)
if err != nil {
return false
}
if err := signRequest(req, c.region, c.accessKey, c.secretKey, hashSHA256(nil)); err != nil {
return false
}
resp, err := c.httpClient.Do(req)
if err != nil {
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}
func (c *Client) Put(ctx context.Context, key string, r io.Reader, size int64) error {
url, err := c.buildURL(key)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, "PUT", url, r)
if err != nil {
return err
}
if size >= 0 {
req.ContentLength = size
}
if err := signRequest(req, c.region, c.accessKey, c.secretKey, "UNSIGNED-PAYLOAD"); err != nil {
return err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("put object failed: %s", resp.Status)
}
return nil
}
func (c *Client) buildURL(key string) (string, error) {
if c.pathStyle {
return fmt.Sprintf("%s/%s/%s", c.endpoint, c.bucket, key), nil
}
u, err := url.Parse(c.endpoint)
if err != nil {
return "", err
}
u.Host = c.bucket + "." + u.Host
u.Path = "/" + key
return u.String(), nil
}
func hmacSHA256(key []byte, data string) []byte {
h := hmac.New(sha256.New, key)
h.Write([]byte(data))
return h.Sum(nil)
}
func hashSHA256(data []byte) string {
sum := sha256.Sum256(data)
return hex.EncodeToString(sum[:])
}
func signRequest(req *http.Request, region, accessKey, secretKey string, payloadHash string) error {
now := time.Now().UTC()
amzDate := now.Format("20060102T150405Z")
date := now.Format("20060102")
req.Header.Set("x-amz-date", amzDate)
req.Header.Set("x-amz-content-sha256", payloadHash)
// Canonical headers
var headers []string
for k := range req.Header {
headers = append(headers, strings.ToLower(k))
}
sort.Strings(headers)
var canonicalHeaders strings.Builder
for _, k := range headers {
canonicalHeaders.WriteString(k)
canonicalHeaders.WriteString(":")
canonicalHeaders.WriteString(strings.TrimSpace(req.Header.Get(k)))
canonicalHeaders.WriteString("\n")
}
signedHeaders := strings.Join(headers, ";")
canonicalRequest := strings.Join([]string{
req.Method,
req.URL.EscapedPath(),
req.URL.RawQuery,
canonicalHeaders.String(),
signedHeaders,
payloadHash,
}, "\n")
scope := fmt.Sprintf("%s/%s/s3/aws4_request", date, region)
stringToSign := strings.Join([]string{
"AWS4-HMAC-SHA256",
amzDate,
scope,
hashSHA256([]byte(canonicalRequest)),
}, "\n")
kDate := hmacSHA256([]byte("AWS4"+secretKey), date)
kRegion := hmacSHA256(kDate, region)
kService := hmacSHA256(kRegion, "s3")
kSigning := hmacSHA256(kService, "aws4_request")
signature := hex.EncodeToString(hmacSHA256(kSigning, stringToSign))
auth := fmt.Sprintf(
"AWS4-HMAC-SHA256 Credential=%s/%s, SignedHeaders=%s, Signature=%s",
accessKey, scope, signedHeaders, signature,
)
req.Header.Set("Authorization", auth)
return nil
}

View File

@@ -1,3 +1,5 @@
//go:build !no_minio
package minio
import (

View File

@@ -0,0 +1,41 @@
//go:build no_minio
package minio
import (
"context"
"fmt"
"io"
"path"
"strings"
config "github.com/krau/SaveAny-Bot/config/storage"
storenum "github.com/krau/SaveAny-Bot/pkg/enums/storage"
)
type Minio struct {
}
func (m *Minio) Init(_ context.Context, _ config.StorageConfig) error {
return fmt.Errorf("minio storage is not supported in this build")
}
func (m *Minio) Type() storenum.StorageType {
return storenum.Minio
}
func (m *Minio) Name() string {
return ""
}
func (m *Minio) JoinStoragePath(p string) string {
return strings.TrimPrefix(path.Join("", p), "/")
}
func (m *Minio) Save(_ context.Context, _ io.Reader, _ string) error {
return fmt.Errorf("minio storage is not supported in this build")
}
func (m *Minio) Exists(_ context.Context, _ string) bool {
return false
}

View File

@@ -4,18 +4,14 @@ import (
"context"
"fmt"
"io"
"net/url"
"path"
"strings"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/charmbracelet/log"
storconfig "github.com/krau/SaveAny-Bot/config/storage"
"github.com/krau/SaveAny-Bot/pkg/enums/ctxkey"
storenum "github.com/krau/SaveAny-Bot/pkg/enums/storage"
"github.com/krau/SaveAny-Bot/pkg/s3"
"github.com/rs/xid"
)
@@ -26,71 +22,32 @@ type S3 struct {
}
func (m *S3) Init(ctx context.Context, cfg storconfig.StorageConfig) error {
s3Config, ok := cfg.(*storconfig.S3StorageConfig)
s3cfg, ok := cfg.(*storconfig.S3StorageConfig)
if !ok {
return fmt.Errorf("failed to cast s3 config")
}
if err := s3Config.Validate(); err != nil {
if err := s3cfg.Validate(); err != nil {
return err
}
m.config = *s3Config
m.config = *s3cfg
m.logger = log.FromContext(ctx).WithPrefix(fmt.Sprintf("s3[%s]", m.config.Name))
loadOpts := make([]config.LoadOptionsFunc, 0)
if m.config.Region != "" {
loadOpts = append(loadOpts, config.WithRegion(m.config.Region))
}
if endpoint := m.config.Endpoint; endpoint != "" {
if !strings.HasPrefix(endpoint, "http://") && !strings.HasPrefix(endpoint, "https://") {
if m.config.UseSSL {
endpoint = "https://" + endpoint
} else {
endpoint = "http://" + endpoint
}
}
if _, err := url.Parse(endpoint); err != nil {
return fmt.Errorf("invalid s3 endpoint %q: %w", m.config.Endpoint, err)
}
loadOpts = append(loadOpts, config.WithBaseEndpoint(endpoint))
}
loadOpts = append(loadOpts, config.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(
m.config.AccessKeyID,
m.config.SecretAccessKey,
"",
),
))
awsCfg, err := config.LoadDefaultConfig(
ctx,
func() []func(*config.LoadOptions) error {
// wtf aws sdk
// https://github.com/aws/aws-sdk-go-v2/issues/2193
funcs := make([]func(*config.LoadOptions) error, 0)
for _, fn := range loadOpts {
funcs = append(funcs, fn)
}
return funcs
}()...,
)
if err != nil {
return fmt.Errorf("failed to load AWS config: %w", err)
}
m.client = s3.NewFromConfig(awsCfg, func(o *s3.Options) {
// Path style: https://s3.amazonaws.com/mybucket/path/to/file.jpg
// virtual hosted style: https://mybucket.s3.amazonaws.com/path/to/file.jpg
o.UsePathStyle = !m.config.VirtualHost
client, err := s3.NewClient(&s3.Config{
Endpoint: m.config.Endpoint,
Region: m.config.Region,
AccessKeyID: m.config.AccessKeyID,
SecretAccessKey: m.config.SecretAccessKey,
BucketName: m.config.BucketName,
PathStyle: !m.config.VirtualHost,
})
if err != nil {
return fmt.Errorf("failed to create s3 client: %w", err)
}
m.client = client
// Check if bucket exists
_, err = m.client.HeadBucket(ctx, &s3.HeadBucketInput{
Bucket: aws.String(m.config.BucketName),
})
if err != nil {
if err := m.client.HeadBucket(ctx); err != nil {
return fmt.Errorf("bucket %s not accessible: %w", m.config.BucketName, err)
}
return nil
}
@@ -131,18 +88,7 @@ func (m *S3) Save(ctx context.Context, r io.Reader, storagePath string) error {
}
}
// S3 PutObject needs either size or StreamingBody
input := &s3.PutObjectInput{
Bucket: aws.String(m.config.BucketName),
Key: aws.String(candidate),
Body: r,
}
if size >= 0 {
input.ContentLength = &size
}
_, err := m.client.PutObject(ctx, input)
err := m.client.Put(ctx, candidate, r, size)
if err != nil {
return fmt.Errorf("failed to upload file to S3: %w", err)
}
@@ -153,10 +99,5 @@ func (m *S3) Save(ctx context.Context, r io.Reader, storagePath string) error {
func (m *S3) Exists(ctx context.Context, storagePath string) bool {
m.logger.Debugf("Checking if file exists at %s", storagePath)
_, err := m.client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(m.config.BucketName),
Key: aws.String(storagePath),
})
return err == nil
return m.client.Exists(ctx, storagePath)
}

View File

@@ -1,4 +1,4 @@
package s3
package s3_test
import (
"bytes"
@@ -10,6 +10,8 @@ import (
"github.com/johannesboyne/gofakes3"
"github.com/johannesboyne/gofakes3/backend/s3mem"
storconfig "github.com/krau/SaveAny-Bot/config/storage"
"github.com/krau/SaveAny-Bot/pkg/enums/ctxkey"
"github.com/krau/SaveAny-Bot/storage/s3"
)
func newTestContext(t *testing.T) context.Context {
@@ -19,7 +21,7 @@ func newTestContext(t *testing.T) context.Context {
return log.WithContext(ctx, logger)
}
func newFakeS3(t *testing.T) (*S3, *storconfig.S3StorageConfig) {
func newFakeS3(t *testing.T) (*s3.S3, *storconfig.S3StorageConfig) {
t.Helper()
backend := s3mem.New()
@@ -45,7 +47,7 @@ func newFakeS3(t *testing.T) (*S3, *storconfig.S3StorageConfig) {
t.Fatalf("failed to create fake bucket: %v", err)
}
s := &S3{}
s := &s3.S3{}
ctx := newTestContext(t)
if err := s.Init(ctx, cfg); err != nil {
t.Fatalf("init s3 failed: %v", err)
@@ -54,9 +56,9 @@ func newFakeS3(t *testing.T) (*S3, *storconfig.S3StorageConfig) {
return s, cfg
}
func TestS3_SaveAndExists(t *testing.T) {
func TestS3(t *testing.T) {
s, _ := newFakeS3(t)
ctx := context.Background()
ctx := t.Context()
content := []byte("hello world")
reader := bytes.NewReader(content)
@@ -69,4 +71,26 @@ func TestS3_SaveAndExists(t *testing.T) {
if !s.Exists(ctx, key) {
t.Fatalf("Exists should return true for saved key")
}
if s.Exists(ctx, "nonexistent.txt") {
t.Fatalf("Exists should return false for nonexistent key")
}
if err := s.Save(ctx, bytes.NewReader(content), key); err != nil {
t.Fatalf("Save with existing key failed: %v", err)
}
if !s.Exists(ctx, "foo/bar_1.txt") {
t.Fatalf("Exists should return true for unique renamed key")
}
var length int64 = int64(len(content))
ctx = context.WithValue(ctx, ctxkey.ContentLength, length)
if err := s.Save(ctx, bytes.NewReader(content), "size_test.txt"); err != nil {
t.Fatalf("Save with content length failed: %v", err)
}
if !s.Exists(ctx, "size_test.txt") {
t.Fatalf("Exists should return true for size_test.txt")
}
}

147
storage/telegram/split.go Normal file
View File

@@ -0,0 +1,147 @@
package telegram
import (
"archive/zip"
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"
)
type splitWriter struct {
baseName string
partSize int64
currentPart int
currentSize int64
currentFile *os.File
totalParts int
}
func newSplitWriter(baseName string, partSize int64) *splitWriter {
return &splitWriter{
baseName: baseName,
partSize: partSize,
currentPart: 0,
}
}
// Write implements io.Writer interface
func (w *splitWriter) Write(p []byte) (n int, err error) {
written := 0
for written < len(p) {
if w.currentFile == nil || w.currentSize >= w.partSize {
if err := w.nextPart(); err != nil {
return written, err
}
}
toWrite := int64(len(p) - written)
remaining := w.partSize - w.currentSize
if toWrite > remaining {
toWrite = remaining
}
nw, err := w.currentFile.Write(p[written : written+int(toWrite)])
written += nw
w.currentSize += int64(nw)
if err != nil {
return written, err
}
}
return written, nil
}
func (w *splitWriter) Close() error {
if w.currentFile != nil {
return w.currentFile.Close()
}
return nil
}
func (w *splitWriter) nextPart() error {
if w.currentFile != nil {
if err := w.currentFile.Close(); err != nil {
return err
}
}
partName := w.partName(w.currentPart)
file, err := os.Create(partName)
if err != nil {
return err
}
w.currentFile = file
w.currentSize = 0
w.currentPart++
return nil
}
func (w *splitWriter) partName(partNum int) string {
// file.zip.001, file.zip.002, ...
return fmt.Sprintf("%s.zip.%03d", w.baseName, partNum+1)
}
func (w *splitWriter) finalize() error {
w.totalParts = w.currentPart
// 如果只有一个分卷,直接重命名为 .zip
if w.totalParts == 1 {
oldName := fmt.Sprintf("%s.zip.001", w.baseName)
newName := fmt.Sprintf("%s.zip", w.baseName)
return os.Rename(oldName, newName)
}
return nil
}
func CreateSplitZip(ctx context.Context, reader io.Reader, size int64, fileName, outputBase string, partSize int64) error {
// seek the reader if possible
if rs, ok := reader.(io.ReadSeeker); ok {
if _, err := rs.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("failed to seek reader: %w", err)
}
}
outputDir := filepath.Dir(outputBase)
if err := os.MkdirAll(outputDir, os.ModePerm); err != nil {
return fmt.Errorf("failed to create output directory: %w", err)
}
splitWriter := newSplitWriter(outputBase, partSize)
defer splitWriter.Close()
zipWriter := zip.NewWriter(splitWriter)
defer zipWriter.Close()
header := &zip.FileHeader{
Name: fileName,
Method: zip.Store, // just store without compression
Modified: time.Now(),
}
writer, err := zipWriter.CreateHeader(header)
if err != nil {
return fmt.Errorf("failed to create zip header: %w", err)
}
copied, err := io.Copy(writer, reader)
if err != nil {
return fmt.Errorf("failed to write data: %w", err)
}
if copied != size {
return fmt.Errorf("incomplete write: expected %d bytes, got %d bytes", size, copied)
}
if err := zipWriter.Close(); err != nil {
return fmt.Errorf("failed to close zip writer: %w", err)
}
if err := splitWriter.Close(); err != nil {
return fmt.Errorf("failed to close split writer: %w", err)
}
if err := splitWriter.finalize(); err != nil {
return fmt.Errorf("failed to rename split files: %w", err)
}
return nil
}

View File

@@ -0,0 +1,55 @@
package telegram
import (
"os"
"path/filepath"
"testing"
)
func TestCreateSplitZip(t *testing.T) {
input := "tests/testfile.dat"
file, err := os.Open(input)
if err != nil {
t.Fatalf("failed to open test file: %v", err)
}
defer file.Close()
fileName := filepath.Base(input)
fileInfo, err := file.Stat()
if err != nil {
t.Fatalf("failed to stat test file: %v", err)
}
fileSize := fileInfo.Size()
tests := []struct {
partSize int64
output string
}{
{partSize: int64(1024 * 1024 * 500), output: "tests/split_test_output_500MB"},
{partSize: int64(1024 * 1024 * 100), output: "tests/split_test_output_100MB"},
}
for _, tt := range tests {
err = CreateSplitZip(t.Context(), file, fileSize, fileName, tt.output, tt.partSize)
if err != nil {
t.Fatalf("CreateSplitZip failed: %v", err)
}
matched, err := filepath.Glob(tt.output + ".z*")
if err != nil {
t.Fatalf("failed to glob split files: %v", err)
}
if len(matched) == 0 {
t.Fatalf("no split files found")
}
t.Logf("Created %d split files", len(matched))
for _, f := range matched {
info, err := os.Stat(f)
if err != nil {
t.Fatalf("failed to stat file %s: %v", f, err)
}
if info.Size() > tt.partSize {
t.Errorf("file %s exceeds part size: %d > %d", f, info.Size(), tt.partSize)
}
t.Logf(" - %s (%d bytes)", f, info.Size())
}
}
}

View File

@@ -4,10 +4,13 @@ import (
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"time"
"github.com/celestix/gotgproto/ext"
"github.com/charmbracelet/log"
"github.com/duke-git/lancet/v2/slice"
"github.com/duke-git/lancet/v2/validator"
@@ -16,6 +19,7 @@ import (
"github.com/gotd/td/telegram/message/styling"
"github.com/gotd/td/telegram/uploader"
"github.com/gotd/td/tg"
"github.com/krau/SaveAny-Bot/common/utils/dlutil"
"github.com/krau/SaveAny-Bot/common/utils/tgutil"
"github.com/krau/SaveAny-Bot/config"
storconfig "github.com/krau/SaveAny-Bot/config/storage"
@@ -26,6 +30,12 @@ import (
"golang.org/x/time/rate"
)
const (
// https://core.telegram.org/api/config#upload-max-fileparts-default
DefaultSplitSize = 4000 * 524288 // 4000 * 512 KB
MaxUploadFileSize = 4000 * 524288 // 4000 * 512 KB
)
type Telegram struct {
config storconfig.TelegramStorageConfig
limiter *rate.Limiter
@@ -65,22 +75,39 @@ func (t *Telegram) Exists(ctx context.Context, storagePath string) bool {
}
func (t *Telegram) Save(ctx context.Context, r io.Reader, storagePath string) error {
if err := t.limiter.Wait(ctx); err != nil {
return fmt.Errorf("rate limit failed: %w", err)
tctx := tgutil.ExtFromContext(ctx)
if tctx == nil {
return fmt.Errorf("failed to get telegram context")
}
size := func() int64 {
if length := ctx.Value(ctxkey.ContentLength); length != nil {
if l, ok := length.(int64); ok {
return l
}
}
return -1 // unknown size
}()
if t.config.SkipLarge && size > MaxUploadFileSize {
log.FromContext(ctx).Warnf("Skipping file larger than Telegram limit (%d bytes): %d bytes", MaxUploadFileSize, size)
return nil
}
rs, seekable := r.(io.ReadSeeker)
if !seekable || rs == nil {
return fmt.Errorf("reader must implement io.ReadSeeker")
}
tctx := tgutil.ExtFromContext(ctx)
if tctx == nil {
return fmt.Errorf("failed to get telegram context")
splitSize := t.config.SplitSizeMB * 1024 * 1024
if splitSize <= 0 {
splitSize = DefaultSplitSize
}
if err := t.limiter.Wait(ctx); err != nil {
return fmt.Errorf("rate limit failed: %w", err)
}
// 去除前导斜杠并分隔路径, 当 len(parts):
// ==0, 存储到配置文件中的 chat_id, 随机文件名
// ==1, 视作只有文件名, 存储到配置文件中的 chat_id
// ==2, parts[0]: 视作要存储到的 chat_id, parts[1]: filename
parts := slice.Compact(strings.Split(strings.TrimPrefix(storagePath, "/"), "/"))
filename := ""
chatID := t.config.ChatID
@@ -113,17 +140,13 @@ func (t *Telegram) Save(ctx context.Context, r io.Reader, storagePath string) er
}
upler := uploader.NewUploader(tctx.Raw).
WithPartSize(tglimit.MaxUploadPartSize).
WithThreads(config.C().Threads)
WithThreads(dlutil.BestThreads(size, config.C().Threads))
if size > splitSize {
// large file, use split uploader
return t.splitUpload(tctx, rs, filename, upler, peer, size, splitSize)
}
var file tg.InputFileClass
size := func() int64 {
if length := ctx.Value(ctxkey.ContentLength); length != nil {
if l, ok := length.(int64); ok {
return l
}
}
return -1 // unknown size
}()
if size < 0 {
file, err = upler.FromReader(ctx, filename, rs)
} else {
@@ -186,3 +209,91 @@ func (t *Telegram) Save(ctx context.Context, r io.Reader, storagePath string) er
func (t *Telegram) CannotStream() string {
return "Telegram storage must use a ReaderSeeker"
}
func (t *Telegram) splitUpload(ctx *ext.Context, rs io.ReadSeeker, filename string, upler *uploader.Uploader, peer tg.InputPeerClass, fileSize, splitSize int64) error {
tempId := xid.New().String()
outputBase := filepath.Join(config.C().Temp.BasePath, tempId, strings.Split(filename, ".")[0])
defer func() {
// cleanup temp files
if err := os.RemoveAll(filepath.Join(config.C().Temp.BasePath, tempId)); err != nil {
log.FromContext(ctx).Warnf("Failed to cleanup temp split files: %s", err)
}
}()
if err := CreateSplitZip(ctx, rs, fileSize, filename, outputBase, splitSize); err != nil {
return fmt.Errorf("failed to create split zip: %w", err)
}
matched, err := filepath.Glob(outputBase + ".z*")
if err != nil {
return fmt.Errorf("failed to glob split files: %w", err)
}
inputFiles := make([]tg.InputFileClass, 0, len(matched))
for _, partPath := range matched {
// 串行上传, 不然容易被tg风控
err = func() error {
partFile, err := os.Open(partPath)
if err != nil {
return fmt.Errorf("failed to open split part %s: %w", partPath, err)
}
defer partFile.Close()
partInfo, err := partFile.Stat()
if err != nil {
return fmt.Errorf("failed to stat split part %s: %w", partPath, err)
}
partFileSize := partInfo.Size()
partName := filepath.Base(partPath)
partInputFile, err := upler.Upload(ctx, uploader.NewUpload(partName, partFile, partFileSize))
if err != nil {
return fmt.Errorf("failed to upload split part %s: %w", partPath, err)
}
inputFiles = append(inputFiles, partInputFile)
return nil
}()
if err != nil {
return fmt.Errorf("failed to upload split part %s: %w", partPath, err)
}
}
if len(inputFiles) == 1 {
// only one part, send as normal file
// shoud not happen as we already check fileSize > splitSize
doc := message.UploadedDocument(inputFiles[0]).
Filename(filepath.Base(matched[0])).
ForceFile(true).
MIME("application/zip")
_, err = ctx.Sender.
WithUploader(upler).
To(peer).
Media(ctx, doc)
return err
}
multiMedia := make([]message.MultiMediaOption, 0, len(inputFiles))
for i, inputFile := range inputFiles {
doc := message.UploadedDocument(inputFile).
Filename(filepath.Base(matched[i])).
MIME("application/zip")
multiMedia = append(multiMedia, doc)
}
sender := ctx.Sender
if len(multiMedia) <= 10 {
_, err = sender.WithUploader(upler).
To(peer).
Album(ctx, multiMedia[0], multiMedia[1:]...)
return err
}
// more than 10 parts, send in batches, each batch up to 10 parts
for i := 0; i < len(multiMedia); i += 10 {
end := min(i+10, len(multiMedia))
batch := multiMedia[i:end]
_, err = sender.WithUploader(upler).
To(peer).
Album(ctx, batch[0], batch[1:]...)
if err != nil {
return fmt.Errorf("failed to send album batch: %w", err)
}
}
return nil
}