mirror of
https://github.com/krau/SaveAny-Bot.git
synced 2026-06-25 17:23:50 +08:00
Compare commits
5 Commits
v0.56.1
...
feat/watch
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
27fe1ebe49 | ||
|
|
88b170acaa | ||
|
|
8059e27978 | ||
|
|
bfab4c85c8 | ||
|
|
62e4a08e28 |
@@ -66,6 +66,19 @@ func (f *TaskFactory) CreateTask(req *CreateTaskRequest) (*CreateTaskResponse, e
|
||||
}
|
||||
}
|
||||
|
||||
func (f *TaskFactory) registerAndEnqueueTask(task core.Executable, taskType tasktype.TaskType, storageName, path, webhook string) error {
|
||||
taskID := task.TaskID()
|
||||
RegisterTask(taskID, string(taskType), storageName, path, task.Title(), webhook)
|
||||
|
||||
err := core.AddTask(f.ctx, NewExecutableWrapper(task))
|
||||
if err != nil {
|
||||
DeleteTask(taskID)
|
||||
return fmt.Errorf("failed to add task: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// createDirectLinksTask 创建直链下载任务
|
||||
func (f *TaskFactory) createDirectLinksTask(taskID string, createdAt time.Time, req *CreateTaskRequest, stor storage.Storage) (*CreateTaskResponse, error) {
|
||||
var params DirectLinksParams
|
||||
@@ -79,8 +92,9 @@ func (f *TaskFactory) createDirectLinksTask(taskID string, createdAt time.Time,
|
||||
|
||||
task := directlinks.NewTask(taskID, f.ctx, params.URLs, stor, req.Path, nil)
|
||||
|
||||
if err := core.AddTask(f.ctx, task); err != nil {
|
||||
return nil, fmt.Errorf("failed to add task: %w", err)
|
||||
err := f.registerAndEnqueueTask(task, tasktype.TaskTypeDirectlinks, req.Storage, req.Path, req.Webhook)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &CreateTaskResponse{
|
||||
@@ -104,8 +118,9 @@ func (f *TaskFactory) createYTDLPTask(taskID string, createdAt time.Time, req *C
|
||||
|
||||
task := ytdlp.NewTask(taskID, f.ctx, params.URLs, params.Flags, stor, req.Path, nil)
|
||||
|
||||
if err := core.AddTask(f.ctx, task); err != nil {
|
||||
return nil, fmt.Errorf("failed to add task: %w", err)
|
||||
err := f.registerAndEnqueueTask(task, tasktype.TaskTypeYtdlp, req.Storage, req.Path, req.Webhook)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &CreateTaskResponse{
|
||||
@@ -146,8 +161,9 @@ func (f *TaskFactory) createAria2Task(taskID string, createdAt time.Time, req *C
|
||||
|
||||
task := aria2dl.NewTask(taskID, f.ctx, gid, params.URLs, aria2Client, stor, req.Path, nil)
|
||||
|
||||
if err := core.AddTask(f.ctx, task); err != nil {
|
||||
return nil, fmt.Errorf("failed to add task: %w", err)
|
||||
err = f.registerAndEnqueueTask(task, tasktype.TaskTypeAria2, req.Storage, req.Path, req.Webhook)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &CreateTaskResponse{
|
||||
@@ -190,8 +206,9 @@ func (f *TaskFactory) createParsedTask(taskID string, createdAt time.Time, req *
|
||||
|
||||
task := parsed.NewTask(taskID, f.ctx, stor, req.Path, item, nil)
|
||||
|
||||
if err := core.AddTask(f.ctx, task); err != nil {
|
||||
return nil, fmt.Errorf("failed to add task: %w", err)
|
||||
err = f.registerAndEnqueueTask(task, tasktype.TaskTypeParseditem, req.Storage, req.Path, req.Webhook)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &CreateTaskResponse{
|
||||
@@ -223,15 +240,15 @@ func (f *TaskFactory) createTGFilesTask(taskID string, createdAt time.Time, req
|
||||
return nil, fmt.Errorf("no files found in provided links")
|
||||
}
|
||||
|
||||
var task core.Executable
|
||||
|
||||
if len(files) == 1 {
|
||||
// 单个文件任务
|
||||
tfileTask, err := tfile.NewTGFileTask(taskID, f.ctx, files[0], stor, req.Path, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create tfile task: %w", err)
|
||||
}
|
||||
if err := core.AddTask(f.ctx, tfileTask); err != nil {
|
||||
return nil, fmt.Errorf("failed to add task: %w", err)
|
||||
}
|
||||
task = tfileTask
|
||||
} else {
|
||||
// 批量文件任务
|
||||
elems := make([]batchtfile.TaskElement, 0, len(files))
|
||||
@@ -243,10 +260,12 @@ func (f *TaskFactory) createTGFilesTask(taskID string, createdAt time.Time, req
|
||||
elems = append(elems, *elem)
|
||||
}
|
||||
|
||||
task := batchtfile.NewBatchTGFileTask(taskID, f.ctx, elems, nil, true)
|
||||
if err := core.AddTask(f.ctx, task); err != nil {
|
||||
return nil, fmt.Errorf("failed to add task: %w", err)
|
||||
}
|
||||
task = batchtfile.NewBatchTGFileTask(taskID, f.ctx, elems, nil, true)
|
||||
}
|
||||
|
||||
err = f.registerAndEnqueueTask(task, tasktype.TaskTypeTgfiles, req.Storage, req.Path, req.Webhook)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &CreateTaskResponse{
|
||||
@@ -281,8 +300,9 @@ func (f *TaskFactory) createTPHPicsTask(taskID string, createdAt time.Time, req
|
||||
client := telegraph.NewClient()
|
||||
task := tphtask.NewTask(taskID, f.ctx, phPath, pics, stor, req.Path, client, nil)
|
||||
|
||||
if err := core.AddTask(f.ctx, task); err != nil {
|
||||
return nil, fmt.Errorf("failed to add task: %w", err)
|
||||
err = f.registerAndEnqueueTask(task, tasktype.TaskTypeTphpics, req.Storage, req.Path, req.Webhook)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &CreateTaskResponse{
|
||||
@@ -342,8 +362,9 @@ func (f *TaskFactory) createTransferTask(taskID string, createdAt time.Time, req
|
||||
|
||||
task := transfer.NewTransferTask(taskID, f.ctx, elems, nil, true)
|
||||
|
||||
if err := core.AddTask(f.ctx, task); err != nil {
|
||||
return nil, fmt.Errorf("failed to add task: %w", err)
|
||||
err = f.registerAndEnqueueTask(task, tasktype.TaskTypeTransfer, params.TargetStorage, params.TargetPath, req.Webhook)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &CreateTaskResponse{
|
||||
|
||||
@@ -30,16 +30,21 @@ func NewServer(ctx context.Context) *Server {
|
||||
mux.HandleFunc("/health", handlers.HealthCheckHandler)
|
||||
|
||||
// API v1 路由
|
||||
mux.HandleFunc("/api/v1/tasks", handlers.CreateTaskHandler)
|
||||
mux.HandleFunc("/api/v1/tasks", func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.Method {
|
||||
case http.MethodGet:
|
||||
handlers.ListTasksHandler(w, r)
|
||||
case http.MethodPost:
|
||||
handlers.CreateTaskHandler(w, r)
|
||||
default:
|
||||
MethodNotAllowedHandler(w, r)
|
||||
}
|
||||
})
|
||||
mux.HandleFunc("/api/v1/tasks/", func(w http.ResponseWriter, r *http.Request) {
|
||||
// 根据方法和路径分发
|
||||
switch r.Method {
|
||||
case http.MethodGet:
|
||||
if r.URL.Path == "/api/v1/tasks" {
|
||||
handlers.ListTasksHandler(w, r)
|
||||
} else {
|
||||
handlers.GetTaskHandler(w, r)
|
||||
}
|
||||
handlers.GetTaskHandler(w, r)
|
||||
case http.MethodDelete:
|
||||
handlers.CancelTaskHandler(w, r)
|
||||
default:
|
||||
|
||||
58
api/wrapper.go
Normal file
58
api/wrapper.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/krau/SaveAny-Bot/core"
|
||||
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
||||
)
|
||||
|
||||
// ExecutableWrapper wraps core.Executable to track task status in the API store and send webhooks.
|
||||
type ExecutableWrapper struct {
|
||||
inner core.Executable
|
||||
}
|
||||
|
||||
func NewExecutableWrapper(inner core.Executable) *ExecutableWrapper {
|
||||
return &ExecutableWrapper{inner: inner}
|
||||
}
|
||||
|
||||
func (w *ExecutableWrapper) Type() tasktype.TaskType { return w.inner.Type() }
|
||||
func (w *ExecutableWrapper) Title() string { return w.inner.Title() }
|
||||
func (w *ExecutableWrapper) TaskID() string { return w.inner.TaskID() }
|
||||
|
||||
func (w *ExecutableWrapper) Execute(ctx context.Context) error {
|
||||
taskID := w.inner.TaskID()
|
||||
|
||||
if info, ok := GetTask(taskID); ok {
|
||||
info.UpdateStatus(TaskStatusRunning)
|
||||
}
|
||||
|
||||
err := w.inner.Execute(ctx)
|
||||
|
||||
info, ok := GetTask(taskID)
|
||||
if !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
var status TaskStatus
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
status = TaskStatusCancelled
|
||||
info.UpdateStatus(TaskStatusCancelled)
|
||||
} else {
|
||||
status = TaskStatusFailed
|
||||
info.SetError(err.Error())
|
||||
}
|
||||
} else {
|
||||
status = TaskStatusCompleted
|
||||
info.UpdateStatus(TaskStatusCompleted)
|
||||
}
|
||||
|
||||
if info.Webhook != "" {
|
||||
payload := CreateWebhookPayload(taskID, info.Type, status, info.Storage, info.Path, err)
|
||||
SendWebhook(ctx, payload)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -73,14 +73,16 @@ func handleAddCallback(ctx *ext.Context, update *ext.Update) error {
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
dirPath = dir.Path
|
||||
} else if data.SelectedDirPath != "" {
|
||||
dirPath = data.SelectedDirPath
|
||||
}
|
||||
|
||||
switch data.TaskType {
|
||||
case tasktype.TaskTypeTgfiles:
|
||||
if data.AsBatch {
|
||||
return shortcut.CreateAndAddBatchTGFileTaskWithEdit(ctx, userID, selectedStorage, dirPath, data.Files, msgID)
|
||||
return shortcut.CreateAndAddBatchTGFileTaskWithEdit(ctx, userID, selectedStorage, dirPath, data.Files, msgID, data.ConflictStrategy)
|
||||
}
|
||||
return shortcut.CreateAndAddTGFileTaskWithEdit(ctx, userID, selectedStorage, dirPath, data.Files[0], msgID)
|
||||
return shortcut.CreateAndAddTGFileTaskWithEdit(ctx, userID, selectedStorage, dirPath, data.Files[0], msgID, data.ConflictStrategy)
|
||||
case tasktype.TaskTypeTphpics:
|
||||
return shortcut.CreateAndAddtelegraphWithEdit(ctx, userID, data.TphPageNode, data.TphDirPath, data.TphPics, selectedStorage, msgID)
|
||||
case tasktype.TaskTypeParseditem:
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/celestix/gotgproto/dispatcher"
|
||||
"github.com/celestix/gotgproto/ext"
|
||||
"github.com/gotd/td/tg"
|
||||
"github.com/krau/SaveAny-Bot/client/bot/handlers/utils/conflictutil"
|
||||
"github.com/krau/SaveAny-Bot/common/i18n"
|
||||
"github.com/krau/SaveAny-Bot/common/i18n/i18nk"
|
||||
"github.com/krau/SaveAny-Bot/config"
|
||||
@@ -26,6 +27,10 @@ func handleConfigCmd(ctx *ext.Context, update *ext.Update) error {
|
||||
Text: i18n.T(i18nk.BotMsgConfigButtonFilenameStrategy),
|
||||
Data: fmt.Appendf(nil, "%s %s", tcbdata.TypeConfig, "fnamest"),
|
||||
},
|
||||
&tg.KeyboardButtonCallback{
|
||||
Text: i18n.T(i18nk.BotMsgConfigButtonConflictStrategy),
|
||||
Data: fmt.Appendf(nil, "%s %s", tcbdata.TypeConfig, "conflictst"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -51,6 +56,8 @@ func handleConfigCallback(ctx *ext.Context, update *ext.Update) error {
|
||||
switch args[1] {
|
||||
case "fnamest":
|
||||
return handleConfigFnameSTCallback(ctx, update)
|
||||
case "conflictst":
|
||||
return handleConfigConflictSTCallback(ctx, update)
|
||||
default:
|
||||
return invaildDataAnswer()
|
||||
}
|
||||
@@ -110,6 +117,55 @@ func handleConfigFnameSTCallback(ctx *ext.Context, update *ext.Update) error {
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
|
||||
func handleConfigConflictSTCallback(ctx *ext.Context, update *ext.Update) error {
|
||||
userID := update.CallbackQuery.GetUserID()
|
||||
user, err := database.GetUserByChatID(ctx, userID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
args := strings.Fields(string(update.CallbackQuery.Data))
|
||||
if len(args) == 3 {
|
||||
selected := args[2]
|
||||
if !tcbdata.IsConflictStrategy(selected) {
|
||||
return fmt.Errorf("invalid conflict strategy: %s", selected)
|
||||
}
|
||||
user.ConflictStrategy = selected
|
||||
if err := database.UpdateUser(ctx, user); err != nil {
|
||||
return err
|
||||
}
|
||||
ctx.EditMessage(userID, &tg.MessagesEditMessageRequest{
|
||||
ID: update.CallbackQuery.GetMsgID(),
|
||||
Message: i18n.T(i18nk.BotMsgConfigInfoConflictStrategySet, map[string]any{
|
||||
"Strategy": conflictutil.Display(selected),
|
||||
}),
|
||||
})
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
|
||||
opts := tcbdata.ConflictStrategyValues()
|
||||
rows := make([]tg.KeyboardButtonRow, 0, len(opts))
|
||||
for _, opt := range opts {
|
||||
rows = append(rows, tg.KeyboardButtonRow{
|
||||
Buttons: []tg.KeyboardButtonClass{
|
||||
&tg.KeyboardButtonCallback{
|
||||
Text: conflictutil.Display(opt),
|
||||
Data: fmt.Appendf(nil, "%s %s %s", tcbdata.TypeConfig, "conflictst", opt),
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
markup := &tg.ReplyInlineMarkup{Rows: rows}
|
||||
currentSt := conflictutil.EffectiveStrategy(user)
|
||||
ctx.EditMessage(userID, &tg.MessagesEditMessageRequest{
|
||||
ID: update.CallbackQuery.GetMsgID(),
|
||||
Message: i18n.T(i18nk.BotMsgConfigPromptSelectConflictStrategy, map[string]any{
|
||||
"Strategy": conflictutil.Display(currentSt),
|
||||
}),
|
||||
ReplyMarkup: markup,
|
||||
})
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
|
||||
func handleConfigFnameTmpl(ctx *ext.Context, update *ext.Update) error {
|
||||
userID := update.GetUserChat().GetID()
|
||||
user, err := database.GetUserByChatID(ctx, userID)
|
||||
|
||||
55
client/bot/handlers/utils/conflictutil/conflict.go
Normal file
55
client/bot/handlers/utils/conflictutil/conflict.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package conflictutil
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/krau/SaveAny-Bot/common/i18n"
|
||||
"github.com/krau/SaveAny-Bot/common/i18n/i18nk"
|
||||
"github.com/krau/SaveAny-Bot/database"
|
||||
"github.com/krau/SaveAny-Bot/pkg/tcbdata"
|
||||
)
|
||||
|
||||
const maxConflictLines = 10
|
||||
|
||||
func EffectiveStrategy(user *database.User) string {
|
||||
if user != nil && tcbdata.IsConflictStrategy(user.ConflictStrategy) {
|
||||
return user.ConflictStrategy
|
||||
}
|
||||
return tcbdata.ConflictStrategyRename
|
||||
}
|
||||
|
||||
func ResolveStrategy(user *database.User, override string) string {
|
||||
if tcbdata.IsConflictStrategy(override) {
|
||||
return override
|
||||
}
|
||||
return EffectiveStrategy(user)
|
||||
}
|
||||
|
||||
func Display(strategy string) string {
|
||||
switch strategy {
|
||||
case tcbdata.ConflictStrategyRename:
|
||||
return i18n.T(i18nk.BotMsgConfigConflictStrategyRename, nil)
|
||||
case tcbdata.ConflictStrategyAsk:
|
||||
return i18n.T(i18nk.BotMsgConfigConflictStrategyAsk, nil)
|
||||
case tcbdata.ConflictStrategyOverwrite:
|
||||
return i18n.T(i18nk.BotMsgConfigConflictStrategyOverwrite, nil)
|
||||
case tcbdata.ConflictStrategySkip:
|
||||
return i18n.T(i18nk.BotMsgConfigConflictStrategySkip, nil)
|
||||
default:
|
||||
return strategy
|
||||
}
|
||||
}
|
||||
|
||||
func FormatPaths(conflicts []string) string {
|
||||
if len(conflicts) <= maxConflictLines {
|
||||
return strings.Join(conflicts, "\n")
|
||||
}
|
||||
return strings.Join(conflicts[:maxConflictLines], "\n") + "\n" + i18n.T(i18nk.BotMsgCommonPromptConflictMoreFiles, map[string]any{
|
||||
"Count": len(conflicts) - maxConflictLines,
|
||||
})
|
||||
}
|
||||
|
||||
func FormatPath(storageName, storagePath string) string {
|
||||
return fmt.Sprintf("[%s]:%s", storageName, storagePath)
|
||||
}
|
||||
@@ -38,6 +38,8 @@ func BuildAddSelectStorageKeyboard(stors []storage.Storage, adddata tcbdata.Add)
|
||||
data := tcbdata.Add{
|
||||
TaskType: taskType,
|
||||
SelectedStorName: storage.Name(),
|
||||
SelectedDirPath: adddata.SelectedDirPath,
|
||||
ConflictStrategy: adddata.ConflictStrategy,
|
||||
|
||||
Files: adddata.Files,
|
||||
AsBatch: len(adddata.Files) > 1,
|
||||
@@ -109,6 +111,38 @@ func BuildAddOneSelectStorageMessage(ctx context.Context, stors []storage.Storag
|
||||
}, nil
|
||||
}
|
||||
|
||||
func BuildConflictStrategyMarkup(adddata tcbdata.Add) (*tg.ReplyInlineMarkup, error) {
|
||||
type option struct {
|
||||
text string
|
||||
strategy string
|
||||
}
|
||||
options := []option{
|
||||
{text: i18n.T(i18nk.BotMsgCommonButtonConflictRename, nil), strategy: tcbdata.ConflictStrategyRename},
|
||||
{text: i18n.T(i18nk.BotMsgCommonButtonConflictOverwrite, nil), strategy: tcbdata.ConflictStrategyOverwrite},
|
||||
{text: i18n.T(i18nk.BotMsgCommonButtonConflictSkip, nil), strategy: tcbdata.ConflictStrategySkip},
|
||||
}
|
||||
buttons := make([]tg.KeyboardButtonClass, 0, len(options))
|
||||
for _, opt := range options {
|
||||
data := adddata
|
||||
data.ConflictStrategy = opt.strategy
|
||||
dataid := xid.New().String()
|
||||
if err := cache.Set(dataid, data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
buttons = append(buttons, &tg.KeyboardButtonCallback{
|
||||
Text: opt.text,
|
||||
Data: fmt.Appendf(nil, "%s %s", tcbdata.TypeAdd, dataid),
|
||||
})
|
||||
}
|
||||
rows := make([]tg.KeyboardButtonRow, 0, len(buttons))
|
||||
for _, button := range buttons {
|
||||
rows = append(rows, tg.KeyboardButtonRow{
|
||||
Buttons: []tg.KeyboardButtonClass{button},
|
||||
})
|
||||
}
|
||||
return &tg.ReplyInlineMarkup{Rows: rows}, nil
|
||||
}
|
||||
|
||||
// Builds the inline keyboard for setting default storage
|
||||
func BuildSetDefaultStorageMarkup(
|
||||
ctx context.Context,
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/celestix/gotgproto/ext"
|
||||
"github.com/charmbracelet/log"
|
||||
"github.com/gotd/td/tg"
|
||||
"github.com/krau/SaveAny-Bot/client/bot/handlers/utils/conflictutil"
|
||||
"github.com/krau/SaveAny-Bot/client/bot/handlers/utils/msgelem"
|
||||
"github.com/krau/SaveAny-Bot/client/bot/handlers/utils/ruleutil"
|
||||
"github.com/krau/SaveAny-Bot/common/i18n"
|
||||
@@ -17,14 +18,17 @@ import (
|
||||
"github.com/krau/SaveAny-Bot/core/tasks/batchtfile"
|
||||
tftask "github.com/krau/SaveAny-Bot/core/tasks/tfile"
|
||||
"github.com/krau/SaveAny-Bot/database"
|
||||
"github.com/krau/SaveAny-Bot/pkg/enums/tasktype"
|
||||
"github.com/krau/SaveAny-Bot/pkg/tcbdata"
|
||||
"github.com/krau/SaveAny-Bot/pkg/tfile"
|
||||
"github.com/krau/SaveAny-Bot/storage"
|
||||
"github.com/rs/xid"
|
||||
)
|
||||
|
||||
// 创建一个 tfile.TGFileTask 并添加到任务队列中, 以编辑消息的方式反馈结果
|
||||
func CreateAndAddTGFileTaskWithEdit(ctx *ext.Context, userID int64, stor storage.Storage, dirPath string, file tfile.TGFileMessage, trackMsgID int) error {
|
||||
func CreateAndAddTGFileTaskWithEdit(ctx *ext.Context, userID int64, stor storage.Storage, dirPath string, file tfile.TGFileMessage, trackMsgID int, conflictStrategy ...string) error {
|
||||
logger := log.FromContext(ctx)
|
||||
strategy := selectedConflictStrategy(conflictStrategy)
|
||||
user, err := database.GetUserByChatID(ctx, userID)
|
||||
if err != nil {
|
||||
logger.Errorf("Failed to get user by chat ID: %s", err)
|
||||
@@ -36,6 +40,7 @@ func CreateAndAddTGFileTaskWithEdit(ctx *ext.Context, userID int64, stor storage
|
||||
})
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
strategy = conflictutil.ResolveStrategy(user, strategy)
|
||||
if user.ApplyRule && user.Rules != nil {
|
||||
matched, matchedStorageName, matchedDirPath := ruleutil.ApplyRule(ctx, user.Rules, ruleutil.NewInput(file))
|
||||
if !matched {
|
||||
@@ -60,7 +65,26 @@ func CreateAndAddTGFileTaskWithEdit(ctx *ext.Context, userID int64, stor storage
|
||||
}
|
||||
startCreateTask:
|
||||
storagePath := path.Join(dirPath, file.Name())
|
||||
if strategy == tcbdata.ConflictStrategyAsk || strategy == tcbdata.ConflictStrategySkip {
|
||||
exists := stor.Exists(ctx, storagePath)
|
||||
if exists && strategy == tcbdata.ConflictStrategyAsk {
|
||||
return promptTGFileConflictStrategy(ctx, userID, stor.Name(), dirPath, []tfile.TGFileMessage{file}, false, []string{conflictutil.FormatPath(stor.Name(), storagePath)}, trackMsgID)
|
||||
}
|
||||
if exists {
|
||||
ctx.EditMessage(userID, &tg.MessagesEditMessageRequest{
|
||||
ID: trackMsgID,
|
||||
Message: i18n.T(i18nk.BotMsgCommonInfoAllConflictFilesSkipped, map[string]any{
|
||||
"Skipped": file.Name(),
|
||||
}),
|
||||
ReplyMarkup: nil,
|
||||
})
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
}
|
||||
injectCtx := tgutil.ExtWithContext(ctx.Context, ctx)
|
||||
if strategy == tcbdata.ConflictStrategyOverwrite {
|
||||
injectCtx = storage.WithOverwrite(injectCtx)
|
||||
}
|
||||
taskid := xid.New().String()
|
||||
task, err := tftask.NewTGFileTask(taskid, injectCtx, file, stor, storagePath,
|
||||
tftask.NewProgressTrack(
|
||||
@@ -97,8 +121,9 @@ startCreateTask:
|
||||
}
|
||||
|
||||
// 创建一个 batchtfile.BatchTGFileTask 并添加到任务队列中, 以编辑消息的方式反馈结果
|
||||
func CreateAndAddBatchTGFileTaskWithEdit(ctx *ext.Context, userID int64, stor storage.Storage, dirPath string, files []tfile.TGFileMessage, trackMsgID int) error {
|
||||
func CreateAndAddBatchTGFileTaskWithEdit(ctx *ext.Context, userID int64, stor storage.Storage, dirPath string, files []tfile.TGFileMessage, trackMsgID int, conflictStrategy ...string) error {
|
||||
logger := log.FromContext(ctx)
|
||||
strategy := selectedConflictStrategy(conflictStrategy)
|
||||
user, err := database.GetUserByChatID(ctx, userID)
|
||||
if err != nil {
|
||||
logger.Errorf("Failed to get user by chat ID: %s", err)
|
||||
@@ -110,6 +135,7 @@ func CreateAndAddBatchTGFileTaskWithEdit(ctx *ext.Context, userID int64, stor st
|
||||
})
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
strategy = conflictutil.ResolveStrategy(user, strategy)
|
||||
|
||||
useRule := user.ApplyRule && user.Rules != nil
|
||||
|
||||
@@ -128,14 +154,17 @@ func CreateAndAddBatchTGFileTaskWithEdit(ctx *ext.Context, userID int64, stor st
|
||||
return storname, dirP
|
||||
}
|
||||
|
||||
skipped := make([]string, 0)
|
||||
conflicts := make([]string, 0)
|
||||
elems := make([]batchtfile.TaskElement, 0, len(files))
|
||||
type albumFile struct {
|
||||
file tfile.TGFileMessage
|
||||
storage storage.Storage
|
||||
dirPath string
|
||||
}
|
||||
albumFiles := make(map[int64][]albumFile, 0)
|
||||
for _, file := range files {
|
||||
storName, dirPath := applyRule(file)
|
||||
storName, matchedDirPath := applyRule(file)
|
||||
fileStor := stor
|
||||
if storName != stor.Name() && storName != "" {
|
||||
fileStor, err = storage.GetStorageByUserIDAndName(ctx, user.ChatID, storName)
|
||||
@@ -150,8 +179,19 @@ func CreateAndAddBatchTGFileTaskWithEdit(ctx *ext.Context, userID int64, stor st
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
}
|
||||
if !dirPath.NeedNewForAlbum() {
|
||||
storPath := path.Join(dirPath.String(), file.Name())
|
||||
if !matchedDirPath.NeedNewForAlbum() {
|
||||
storPath := path.Join(matchedDirPath.String(), file.Name())
|
||||
if strategy == tcbdata.ConflictStrategyAsk || strategy == tcbdata.ConflictStrategySkip {
|
||||
exists := fileStor.Exists(ctx, storPath)
|
||||
if exists && strategy == tcbdata.ConflictStrategyAsk {
|
||||
conflicts = append(conflicts, conflictutil.FormatPath(fileStor.Name(), storPath))
|
||||
continue
|
||||
}
|
||||
if exists {
|
||||
skipped = append(skipped, file.Name())
|
||||
continue
|
||||
}
|
||||
}
|
||||
elem, err := batchtfile.NewTaskElement(fileStor, storPath, file)
|
||||
if err != nil {
|
||||
logger.Errorf("Failed to create task element: %s", err)
|
||||
@@ -170,12 +210,17 @@ func CreateAndAddBatchTGFileTaskWithEdit(ctx *ext.Context, userID int64, stor st
|
||||
logger.Warnf("File %s is not in a group, skipping album handling", file.Name())
|
||||
continue
|
||||
}
|
||||
fileDirPath := matchedDirPath.String()
|
||||
if matchedDirPath.NeedNewForAlbum() {
|
||||
fileDirPath = dirPath
|
||||
}
|
||||
if _, ok := albumFiles[groupId]; !ok {
|
||||
albumFiles[groupId] = make([]albumFile, 0)
|
||||
}
|
||||
albumFiles[groupId] = append(albumFiles[groupId], albumFile{
|
||||
file: file,
|
||||
storage: fileStor,
|
||||
dirPath: fileDirPath,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -188,7 +233,18 @@ func CreateAndAddBatchTGFileTaskWithEdit(ctx *ext.Context, userID int64, stor st
|
||||
albumDir := strings.TrimSuffix(path.Base(afiles[0].file.Name()), path.Ext(afiles[0].file.Name()))
|
||||
albumStor := afiles[0].storage
|
||||
for _, af := range afiles {
|
||||
afstorPath := path.Join(dirPath, albumDir, af.file.Name())
|
||||
afstorPath := path.Join(af.dirPath, albumDir, af.file.Name())
|
||||
if strategy == tcbdata.ConflictStrategyAsk || strategy == tcbdata.ConflictStrategySkip {
|
||||
exists := albumStor.Exists(ctx, afstorPath)
|
||||
if exists && strategy == tcbdata.ConflictStrategyAsk {
|
||||
conflicts = append(conflicts, conflictutil.FormatPath(albumStor.Name(), afstorPath))
|
||||
continue
|
||||
}
|
||||
if exists {
|
||||
skipped = append(skipped, af.file.Name())
|
||||
continue
|
||||
}
|
||||
}
|
||||
elem, err := batchtfile.NewTaskElement(albumStor, afstorPath, af.file)
|
||||
if err != nil {
|
||||
logger.Errorf("Failed to create task element for album file: %s", err)
|
||||
@@ -204,9 +260,26 @@ func CreateAndAddBatchTGFileTaskWithEdit(ctx *ext.Context, userID int64, stor st
|
||||
}
|
||||
}
|
||||
|
||||
if strategy == tcbdata.ConflictStrategyAsk && len(conflicts) > 0 {
|
||||
return promptTGFileConflictStrategy(ctx, userID, stor.Name(), dirPath, files, true, conflicts, trackMsgID)
|
||||
}
|
||||
|
||||
injectCtx := tgutil.ExtWithContext(ctx.Context, ctx)
|
||||
if strategy == tcbdata.ConflictStrategyOverwrite {
|
||||
injectCtx = storage.WithOverwrite(injectCtx)
|
||||
}
|
||||
if len(elems) == 0 {
|
||||
ctx.EditMessage(userID, &tg.MessagesEditMessageRequest{
|
||||
ID: trackMsgID,
|
||||
Message: i18n.T(i18nk.BotMsgCommonInfoAllConflictFilesSkipped, map[string]any{
|
||||
"Skipped": strings.Join(skipped, "\n"),
|
||||
}),
|
||||
ReplyMarkup: nil,
|
||||
})
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
taskid := xid.New().String()
|
||||
task := batchtfile.NewBatchTGFileTask(taskid, injectCtx, elems, batchtfile.NewProgressTracker(trackMsgID, userID), true)
|
||||
task := batchtfile.NewBatchTGFileTask(taskid, injectCtx, elems, batchtfile.NewProgressTrackerWithSkipped(trackMsgID, userID, skipped), true)
|
||||
if err := core.AddTask(injectCtx, task); err != nil {
|
||||
logger.Errorf("Failed to add batch task: %s", err)
|
||||
ctx.EditMessage(userID, &tg.MessagesEditMessageRequest{
|
||||
@@ -218,11 +291,48 @@ func CreateAndAddBatchTGFileTaskWithEdit(ctx *ext.Context, userID int64, stor st
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
ctx.EditMessage(userID, &tg.MessagesEditMessageRequest{
|
||||
ID: trackMsgID,
|
||||
Message: i18n.T(i18nk.BotMsgCommonInfoBatchTasksAdded, map[string]any{
|
||||
"Count": len(files),
|
||||
}),
|
||||
ID: trackMsgID,
|
||||
Message: buildBatchAddedMessage(len(elems), skipped),
|
||||
ReplyMarkup: nil,
|
||||
})
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
|
||||
func promptTGFileConflictStrategy(ctx *ext.Context, userID int64, storageName, dirPath string, files []tfile.TGFileMessage, asBatch bool, conflicts []string, trackMsgID int) error {
|
||||
markup, err := msgelem.BuildConflictStrategyMarkup(tcbdata.Add{
|
||||
TaskType: tasktype.TaskTypeTgfiles,
|
||||
SelectedStorName: storageName,
|
||||
SettedDir: true,
|
||||
SelectedDirPath: dirPath,
|
||||
Files: files,
|
||||
AsBatch: asBatch,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx.EditMessage(userID, &tg.MessagesEditMessageRequest{
|
||||
ID: trackMsgID,
|
||||
Message: i18n.T(i18nk.BotMsgCommonPromptSelectConflictStrategy, map[string]any{"Files": conflictutil.FormatPaths(conflicts)}),
|
||||
ReplyMarkup: markup,
|
||||
})
|
||||
return dispatcher.EndGroups
|
||||
}
|
||||
|
||||
func selectedConflictStrategy(strategies []string) string {
|
||||
if len(strategies) == 0 {
|
||||
return ""
|
||||
}
|
||||
return strategies[0]
|
||||
}
|
||||
|
||||
func buildBatchAddedMessage(count int, skipped []string) string {
|
||||
if len(skipped) == 0 {
|
||||
return i18n.T(i18nk.BotMsgCommonInfoBatchTasksAdded, map[string]any{
|
||||
"Count": count,
|
||||
})
|
||||
}
|
||||
return i18n.T(i18nk.BotMsgCommonInfoBatchTasksAddedWithSkipped, map[string]any{
|
||||
"Count": count,
|
||||
"Skipped": strings.Join(skipped, "\n"),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
// https://github.com/iyear/tdl/blob/master/core/tclient/tclient.go
|
||||
func NewDefaultMiddlewares(ctx context.Context, timeout time.Duration) []telegram.Middleware {
|
||||
return []telegram.Middleware{
|
||||
recovery.New(ctx, newBackoff(timeout)),
|
||||
recovery.New(ctx, func() backoff.BackOff { return newBackoff(timeout) }),
|
||||
retry.New(config.C().Telegram.RpcRetry),
|
||||
floodwait.NewSimpleWaiter(),
|
||||
}
|
||||
|
||||
@@ -14,19 +14,28 @@ import (
|
||||
)
|
||||
|
||||
type recovery struct {
|
||||
ctx context.Context
|
||||
backoff backoff.BackOff
|
||||
ctx context.Context
|
||||
newBackoff func() backoff.BackOff
|
||||
}
|
||||
|
||||
func New(ctx context.Context, backoff backoff.BackOff) telegram.Middleware {
|
||||
// New returns a recovery middleware.
|
||||
//
|
||||
// newBackoff is a factory that must return a fresh backoff.BackOff on every call: backoff implementations in
|
||||
// cenkalti/backoff/v4 (notably ExponentialBackOff) are not safe for concurrent
|
||||
// use, and the Telegram client invokes RPCs from many goroutines in parallel.
|
||||
//
|
||||
// Sharing a single instance corrupts its internal counters, breaks the
|
||||
// exponential interval, and defeats MaxElapsedTime - see issue #218.
|
||||
func New(ctx context.Context, newBackoff func() backoff.BackOff) telegram.Middleware {
|
||||
return &recovery{
|
||||
ctx: ctx,
|
||||
backoff: backoff,
|
||||
ctx: ctx,
|
||||
newBackoff: newBackoff,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recovery) Handle(next tg.Invoker) telegram.InvokeFunc {
|
||||
return func(ctx context.Context, input bin.Encoder, output bin.Decoder) error {
|
||||
b := r.newBackoff()
|
||||
|
||||
return backoff.RetryNotify(func() error {
|
||||
if err := next.Invoke(ctx, input, output); err != nil {
|
||||
@@ -38,7 +47,7 @@ func (r *recovery) Handle(next tg.Invoker) telegram.InvokeFunc {
|
||||
}
|
||||
|
||||
return nil
|
||||
}, r.backoff, func(err error, duration time.Duration) {
|
||||
}, b, func(err error, duration time.Duration) {
|
||||
log.FromContext(ctx).Debug("Wait for connection recovery", "error", err, "duration", duration)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/krau/SaveAny-Bot/cmd/upload"
|
||||
"github.com/krau/SaveAny-Bot/cmd/watch"
|
||||
"github.com/krau/SaveAny-Bot/config"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
@@ -18,6 +19,7 @@ var rootCmd = &cobra.Command{
|
||||
func init() {
|
||||
config.RegisterFlags(rootCmd)
|
||||
upload.Register(rootCmd)
|
||||
watch.Register(rootCmd)
|
||||
}
|
||||
|
||||
func Execute(ctx context.Context) {
|
||||
|
||||
145
cmd/watch/cmd.go
Normal file
145
cmd/watch/cmd.go
Normal file
@@ -0,0 +1,145 @@
|
||||
package watch
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/charmbracelet/log"
|
||||
"github.com/krau/SaveAny-Bot/client/bot"
|
||||
"github.com/krau/SaveAny-Bot/common/cache"
|
||||
"github.com/krau/SaveAny-Bot/common/i18n"
|
||||
"github.com/krau/SaveAny-Bot/common/utils/tgutil"
|
||||
"github.com/krau/SaveAny-Bot/config"
|
||||
"github.com/krau/SaveAny-Bot/database"
|
||||
stortype "github.com/krau/SaveAny-Bot/pkg/enums/storage"
|
||||
"github.com/krau/SaveAny-Bot/storage"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var watchCmd = &cobra.Command{
|
||||
Use: "watch",
|
||||
Short: "watch a local directory and auto-upload changed files to storage",
|
||||
Long: `Watch a local directory and automatically upload created or modified files
|
||||
to the specified storage backend, preserving the relative directory structure.
|
||||
|
||||
Example:
|
||||
saveany-bot watch -p /data/inbox -s mystorage -d backup --recursive`,
|
||||
RunE: runWatch,
|
||||
}
|
||||
|
||||
func Register(root *cobra.Command) {
|
||||
flags := watchCmd.Flags()
|
||||
flags.StringP("path", "p", "", "local directory path to watch")
|
||||
watchCmd.MarkFlagRequired("path")
|
||||
flags.StringP("storage", "s", "", "storage name to upload to")
|
||||
watchCmd.MarkFlagRequired("storage")
|
||||
flags.StringP("dir", "d", "", "storage dir to upload to, default is the base_path of the storage")
|
||||
flags.BoolP("recursive", "r", false, "watch subdirectories recursively")
|
||||
flags.Bool("overwrite", false, "overwrite existing files on storage instead of skipping")
|
||||
flags.Bool("initial-scan", false, "upload existing files in the directory on startup")
|
||||
flags.Duration("debounce", 2*time.Second, "wait time after the last change before uploading a file")
|
||||
flags.Int("upload-workers", 0, "number of concurrent uploads, default is config.workers")
|
||||
flags.Duration("retry-delay", 3*time.Second, "delay between upload retries")
|
||||
root.AddCommand(watchCmd)
|
||||
}
|
||||
|
||||
func runWatch(cmd *cobra.Command, _ []string) error {
|
||||
watchPath, err := cmd.Flags().GetString("path")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
storName, err := cmd.Flags().GetString("storage")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
destDir, err := cmd.Flags().GetString("dir")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
recursive, err := cmd.Flags().GetBool("recursive")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
overwrite, err := cmd.Flags().GetBool("overwrite")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
initialScan, err := cmd.Flags().GetBool("initial-scan")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
debounce, err := cmd.Flags().GetDuration("debounce")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
uploadWorkers, err := cmd.Flags().GetInt("upload-workers")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
retryDelay, err := cmd.Flags().GetDuration("retry-delay")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx := cmd.Context()
|
||||
logger := log.FromContext(ctx)
|
||||
|
||||
configFile := config.GetConfigFile(cmd)
|
||||
if err := config.Init(ctx, configFile); err != nil {
|
||||
return fmt.Errorf("failed to load config: %w", err)
|
||||
}
|
||||
i18n.Init(config.C().Lang)
|
||||
cache.Init()
|
||||
database.Init(ctx)
|
||||
|
||||
stor, err := storage.GetStorageByName(ctx, storName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get storage %q: %w", storName, err)
|
||||
}
|
||||
|
||||
// Telegram storage needs the bot client and its ext context injected into ctx.
|
||||
if stor.Type() == stortype.Telegram {
|
||||
bot.Init(ctx)
|
||||
ctx = tgutil.ExtWithContext(ctx, bot.ExtContext())
|
||||
}
|
||||
|
||||
if uploadWorkers < 1 {
|
||||
uploadWorkers = config.C().Workers
|
||||
}
|
||||
|
||||
uploader := NewUploader(ctx, UploaderOptions{
|
||||
Storage: stor,
|
||||
DestDir: destDir,
|
||||
Overwrite: overwrite,
|
||||
Workers: uploadWorkers,
|
||||
Retry: config.C().Retry,
|
||||
RetryDelay: retryDelay,
|
||||
})
|
||||
|
||||
watcher, err := NewWatcher(ctx, WatcherOptions{
|
||||
Root: watchPath,
|
||||
Recursive: recursive,
|
||||
Debounce: debounce,
|
||||
Uploader: uploader,
|
||||
})
|
||||
if err != nil {
|
||||
uploader.Close()
|
||||
return fmt.Errorf("failed to create watcher: %w", err)
|
||||
}
|
||||
|
||||
if initialScan {
|
||||
watcher.ScanExisting(ctx)
|
||||
}
|
||||
|
||||
logger.Infof("watch started: %s -> storage %q dir %q", watchPath, storName, destDir)
|
||||
|
||||
// Run blocks until ctx is cancelled (e.g. SIGINT).
|
||||
runErr := watcher.Run(ctx)
|
||||
|
||||
// Wait for in-flight uploads to finish before exiting.
|
||||
logger.Info("waiting for in-flight uploads to finish...")
|
||||
uploader.Close()
|
||||
logger.Info("watch stopped")
|
||||
|
||||
return runErr
|
||||
}
|
||||
227
cmd/watch/uploader.go
Normal file
227
cmd/watch/uploader.go
Normal file
@@ -0,0 +1,227 @@
|
||||
package watch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/charmbracelet/log"
|
||||
"github.com/krau/SaveAny-Bot/pkg/enums/ctxkey"
|
||||
"github.com/krau/SaveAny-Bot/storage"
|
||||
)
|
||||
|
||||
type uploadJob struct {
|
||||
// localPath is the absolute path of the local file.
|
||||
localPath string
|
||||
// relPath is relative to the watch root, used to preserve directory structure on storage.
|
||||
relPath string
|
||||
}
|
||||
|
||||
// Uploader uploads local files to the target storage via a worker pool.
|
||||
// If a file changes while being uploaded, it is re-uploaded once after the
|
||||
// current upload finishes, instead of being queued multiple times.
|
||||
type Uploader struct {
|
||||
stor storage.Storage
|
||||
destDir string
|
||||
overwrite bool
|
||||
retry int
|
||||
retryDelay time.Duration
|
||||
logger *log.Logger
|
||||
|
||||
jobs chan uploadJob
|
||||
wg sync.WaitGroup
|
||||
|
||||
mu sync.Mutex
|
||||
// inflight maps in-progress (or queued) file paths. A true value means the
|
||||
// file changed during upload and must be re-queued once done.
|
||||
inflight map[string]bool
|
||||
}
|
||||
|
||||
type UploaderOptions struct {
|
||||
Storage storage.Storage
|
||||
DestDir string
|
||||
Overwrite bool
|
||||
Workers int
|
||||
Retry int
|
||||
RetryDelay time.Duration
|
||||
QueueSize int
|
||||
}
|
||||
|
||||
// NewUploader creates and starts an Uploader. The caller must call Close when done.
|
||||
func NewUploader(ctx context.Context, opts UploaderOptions) *Uploader {
|
||||
if opts.Workers < 1 {
|
||||
opts.Workers = 1
|
||||
}
|
||||
if opts.Retry < 1 {
|
||||
opts.Retry = 1
|
||||
}
|
||||
if opts.RetryDelay <= 0 {
|
||||
opts.RetryDelay = 3 * time.Second
|
||||
}
|
||||
if opts.QueueSize < opts.Workers {
|
||||
opts.QueueSize = opts.Workers * 64
|
||||
}
|
||||
|
||||
u := &Uploader{
|
||||
stor: opts.Storage,
|
||||
destDir: opts.DestDir,
|
||||
overwrite: opts.Overwrite,
|
||||
retry: opts.Retry,
|
||||
retryDelay: opts.RetryDelay,
|
||||
logger: log.FromContext(ctx).WithPrefix("uploader"),
|
||||
jobs: make(chan uploadJob, opts.QueueSize),
|
||||
inflight: make(map[string]bool),
|
||||
}
|
||||
|
||||
for i := 0; i < opts.Workers; i++ {
|
||||
u.wg.Add(1)
|
||||
go u.worker(ctx)
|
||||
}
|
||||
|
||||
return u
|
||||
}
|
||||
|
||||
// Submit enqueues an upload job. If the same file is already in flight, it is
|
||||
// marked for re-upload instead of being queued again. Returns false if ctx is
|
||||
// cancelled before the job can be enqueued.
|
||||
func (u *Uploader) Submit(ctx context.Context, job uploadJob) bool {
|
||||
u.mu.Lock()
|
||||
if _, ok := u.inflight[job.localPath]; ok {
|
||||
u.inflight[job.localPath] = true
|
||||
u.mu.Unlock()
|
||||
u.logger.Debugf("file %s already in flight, marked for re-upload", job.localPath)
|
||||
return true
|
||||
}
|
||||
u.inflight[job.localPath] = false
|
||||
u.mu.Unlock()
|
||||
|
||||
select {
|
||||
case u.jobs <- job:
|
||||
return true
|
||||
case <-ctx.Done():
|
||||
u.mu.Lock()
|
||||
delete(u.inflight, job.localPath)
|
||||
u.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (u *Uploader) worker(ctx context.Context) {
|
||||
defer u.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case job, ok := <-u.jobs:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
u.process(ctx, job)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (u *Uploader) process(ctx context.Context, job uploadJob) {
|
||||
if err := u.uploadWithRetry(ctx, job); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
u.clearInflight(job.localPath)
|
||||
return
|
||||
}
|
||||
u.logger.Errorf("failed to upload %s after %d attempt(s): %v", job.localPath, u.retry, err)
|
||||
}
|
||||
|
||||
// Re-queue if the file changed again while it was being uploaded.
|
||||
u.mu.Lock()
|
||||
needReupload := u.inflight[job.localPath]
|
||||
if needReupload {
|
||||
u.inflight[job.localPath] = false
|
||||
} else {
|
||||
delete(u.inflight, job.localPath)
|
||||
}
|
||||
u.mu.Unlock()
|
||||
|
||||
if needReupload {
|
||||
select {
|
||||
case u.jobs <- job:
|
||||
u.logger.Debugf("re-queued %s due to changes during upload", job.localPath)
|
||||
case <-ctx.Done():
|
||||
u.clearInflight(job.localPath)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (u *Uploader) clearInflight(localPath string) {
|
||||
u.mu.Lock()
|
||||
delete(u.inflight, localPath)
|
||||
u.mu.Unlock()
|
||||
}
|
||||
|
||||
func (u *Uploader) uploadWithRetry(ctx context.Context, job uploadJob) error {
|
||||
var lastErr error
|
||||
for attempt := 1; attempt <= u.retry; attempt++ {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
err := u.upload(ctx, job)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
lastErr = err
|
||||
u.logger.Warnf("upload %s failed (attempt %d/%d): %v", job.localPath, attempt, u.retry, err)
|
||||
if attempt < u.retry {
|
||||
select {
|
||||
case <-time.After(u.retryDelay):
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func (u *Uploader) upload(ctx context.Context, job uploadJob) error {
|
||||
file, err := os.Open(filepath.Clean(job.localPath))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
info, err := file.Stat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to stat file: %w", err)
|
||||
}
|
||||
if info.IsDir() {
|
||||
return fmt.Errorf("path is a directory, not a file")
|
||||
}
|
||||
|
||||
// Keep the relative directory structure on the storage side.
|
||||
storagePath := path.Join(u.destDir, filepath.ToSlash(job.relPath))
|
||||
|
||||
uploadCtx := context.WithValue(ctx, ctxkey.ContentLength, info.Size())
|
||||
if u.overwrite {
|
||||
uploadCtx = storage.WithOverwrite(uploadCtx)
|
||||
} else if u.stor.Exists(uploadCtx, storagePath) {
|
||||
u.logger.Infof("skip existing file: %s", storagePath)
|
||||
return nil
|
||||
}
|
||||
|
||||
u.logger.Infof("uploading %s -> %s (%d bytes)", job.localPath, storagePath, info.Size())
|
||||
if err := u.stor.Save(uploadCtx, file, storagePath); err != nil {
|
||||
return fmt.Errorf("failed to save to storage: %w", err)
|
||||
}
|
||||
u.logger.Infof("uploaded %s", storagePath)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close stops accepting jobs and waits for in-flight uploads to finish.
|
||||
func (u *Uploader) Close() {
|
||||
close(u.jobs)
|
||||
u.wg.Wait()
|
||||
}
|
||||
269
cmd/watch/watcher.go
Normal file
269
cmd/watch/watcher.go
Normal file
@@ -0,0 +1,269 @@
|
||||
package watch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/charmbracelet/log"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
)
|
||||
|
||||
// Watcher watches a local directory and submits stable files to the Uploader.
|
||||
//
|
||||
// Write-completion detection: fsnotify emits Write events throughout a write.
|
||||
// Watcher debounces per file and only uploads once the file size stays
|
||||
// unchanged across a debounce window, avoiding uploads of partial files.
|
||||
type Watcher struct {
|
||||
root string
|
||||
recursive bool
|
||||
debounce time.Duration
|
||||
uploader *Uploader
|
||||
logger *log.Logger
|
||||
|
||||
fsw *fsnotify.Watcher
|
||||
|
||||
mu sync.Mutex
|
||||
pending map[string]*time.Timer
|
||||
// lastSize is the last observed file size, used to detect a stable write.
|
||||
lastSize map[string]int64
|
||||
}
|
||||
|
||||
type WatcherOptions struct {
|
||||
Root string
|
||||
Recursive bool
|
||||
Debounce time.Duration
|
||||
Uploader *Uploader
|
||||
}
|
||||
|
||||
// NewWatcher creates a Watcher.
|
||||
func NewWatcher(ctx context.Context, opts WatcherOptions) (*Watcher, error) {
|
||||
if opts.Debounce <= 0 {
|
||||
opts.Debounce = 2 * time.Second
|
||||
}
|
||||
root, err := filepath.Abs(opts.Root)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to resolve root path: %w", err)
|
||||
}
|
||||
info, err := os.Stat(root)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to stat root path: %w", err)
|
||||
}
|
||||
if !info.IsDir() {
|
||||
return nil, fmt.Errorf("watch path must be a directory: %s", root)
|
||||
}
|
||||
|
||||
fsw, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create fsnotify watcher: %w", err)
|
||||
}
|
||||
|
||||
w := &Watcher{
|
||||
root: root,
|
||||
recursive: opts.Recursive,
|
||||
debounce: opts.Debounce,
|
||||
uploader: opts.Uploader,
|
||||
logger: log.FromContext(ctx).WithPrefix("watcher"),
|
||||
fsw: fsw,
|
||||
pending: make(map[string]*time.Timer),
|
||||
lastSize: make(map[string]int64),
|
||||
}
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// Run starts watching and blocks until ctx is cancelled.
|
||||
func (w *Watcher) Run(ctx context.Context) error {
|
||||
if err := w.addDir(w.root); err != nil {
|
||||
w.fsw.Close()
|
||||
return fmt.Errorf("failed to watch root: %w", err)
|
||||
}
|
||||
w.logger.Infof("watching %s (recursive=%v, debounce=%s)", w.root, w.recursive, w.debounce)
|
||||
|
||||
defer w.cleanup()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
w.logger.Info("stopping watcher")
|
||||
return nil
|
||||
case event, ok := <-w.fsw.Events:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
w.handleEvent(ctx, event)
|
||||
case err, ok := <-w.fsw.Errors:
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
w.logger.Errorf("watch error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Watcher) handleEvent(ctx context.Context, event fsnotify.Event) {
|
||||
// Remove/Rename: cancel any pending upload for this path.
|
||||
if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) {
|
||||
w.cancelPending(event.Name)
|
||||
return
|
||||
}
|
||||
|
||||
if !event.Has(fsnotify.Create) && !event.Has(fsnotify.Write) {
|
||||
return
|
||||
}
|
||||
|
||||
info, err := os.Stat(event.Name)
|
||||
if err != nil {
|
||||
// File may have been removed or moved; ignore.
|
||||
return
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
// New directory: watch it recursively and scan files already inside.
|
||||
if event.Has(fsnotify.Create) && w.recursive {
|
||||
if err := w.addDir(event.Name); err != nil {
|
||||
w.logger.Errorf("failed to watch new dir %s: %v", event.Name, err)
|
||||
}
|
||||
w.scanExisting(ctx, event.Name)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
w.scheduleUpload(ctx, event.Name)
|
||||
}
|
||||
|
||||
// scheduleUpload schedules a debounced upload for a file.
|
||||
func (w *Watcher) scheduleUpload(ctx context.Context, file string) {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if t, ok := w.pending[file]; ok {
|
||||
t.Stop()
|
||||
}
|
||||
w.pending[file] = time.AfterFunc(w.debounce, func() {
|
||||
w.maybeUpload(ctx, file)
|
||||
})
|
||||
}
|
||||
|
||||
// maybeUpload submits the upload once the debounce window passes and the file
|
||||
// size is stable; otherwise it waits another window.
|
||||
func (w *Watcher) maybeUpload(ctx context.Context, file string) {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
info, err := os.Stat(file)
|
||||
if err != nil {
|
||||
w.cancelPending(file)
|
||||
return
|
||||
}
|
||||
if info.IsDir() {
|
||||
w.cancelPending(file)
|
||||
return
|
||||
}
|
||||
|
||||
w.mu.Lock()
|
||||
prevSize, seen := w.lastSize[file]
|
||||
curSize := info.Size()
|
||||
if !seen || prevSize != curSize {
|
||||
// Size still changing: likely still being written, wait another window.
|
||||
w.lastSize[file] = curSize
|
||||
w.pending[file] = time.AfterFunc(w.debounce, func() {
|
||||
w.maybeUpload(ctx, file)
|
||||
})
|
||||
w.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// Size stable: treat write as complete.
|
||||
delete(w.pending, file)
|
||||
delete(w.lastSize, file)
|
||||
w.mu.Unlock()
|
||||
|
||||
relPath, err := filepath.Rel(w.root, file)
|
||||
if err != nil {
|
||||
w.logger.Errorf("failed to compute relative path for %s: %v", file, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.uploader.Submit(ctx, uploadJob{localPath: file, relPath: relPath})
|
||||
}
|
||||
|
||||
func (w *Watcher) cancelPending(file string) {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
if t, ok := w.pending[file]; ok {
|
||||
t.Stop()
|
||||
delete(w.pending, file)
|
||||
}
|
||||
delete(w.lastSize, file)
|
||||
}
|
||||
|
||||
// addDir adds a directory to the watch list, recursively when enabled.
|
||||
func (w *Watcher) addDir(dir string) error {
|
||||
if !w.recursive {
|
||||
return w.fsw.Add(dir)
|
||||
}
|
||||
return filepath.WalkDir(dir, func(p string, d fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
w.logger.Warnf("skip path %s: %v", p, err)
|
||||
return nil
|
||||
}
|
||||
if d.IsDir() {
|
||||
if addErr := w.fsw.Add(p); addErr != nil {
|
||||
w.logger.Warnf("failed to watch dir %s: %v", p, addErr)
|
||||
} else {
|
||||
w.logger.Debugf("watching dir %s", p)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// scanExisting submits files already present under dir (initial sync and new-dir backfill).
|
||||
func (w *Watcher) scanExisting(ctx context.Context, dir string) {
|
||||
walkFn := func(p string, d fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
w.logger.Warnf("skip path %s: %v", p, err)
|
||||
return nil
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
if d.IsDir() {
|
||||
if !w.recursive && p != dir {
|
||||
return fs.SkipDir
|
||||
}
|
||||
return nil
|
||||
}
|
||||
relPath, relErr := filepath.Rel(w.root, p)
|
||||
if relErr != nil {
|
||||
w.logger.Errorf("failed to compute relative path for %s: %v", p, relErr)
|
||||
return nil
|
||||
}
|
||||
w.uploader.Submit(ctx, uploadJob{localPath: p, relPath: relPath})
|
||||
return nil
|
||||
}
|
||||
if err := filepath.WalkDir(dir, walkFn); err != nil && ctx.Err() == nil {
|
||||
w.logger.Errorf("failed to scan dir %s: %v", dir, err)
|
||||
}
|
||||
}
|
||||
|
||||
// ScanExisting triggers a one-time scan and upload of existing files under the watch root.
|
||||
func (w *Watcher) ScanExisting(ctx context.Context) {
|
||||
w.logger.Info("scanning existing files for initial sync")
|
||||
w.scanExisting(ctx, w.root)
|
||||
}
|
||||
|
||||
func (w *Watcher) cleanup() {
|
||||
w.mu.Lock()
|
||||
for _, t := range w.pending {
|
||||
t.Stop()
|
||||
}
|
||||
w.pending = make(map[string]*time.Timer)
|
||||
w.lastSize = make(map[string]int64)
|
||||
w.mu.Unlock()
|
||||
w.fsw.Close()
|
||||
}
|
||||
@@ -36,6 +36,9 @@ const (
|
||||
BotMsgCmdUpdate Key = "bot.msg.cmd.update"
|
||||
BotMsgCmdWatch Key = "bot.msg.cmd.watch"
|
||||
BotMsgCmdYtdlp Key = "bot.msg.cmd.ytdlp"
|
||||
BotMsgCommonButtonConflictOverwrite Key = "bot.msg.common.button_conflict_overwrite"
|
||||
BotMsgCommonButtonConflictRename Key = "bot.msg.common.button_conflict_rename"
|
||||
BotMsgCommonButtonConflictSkip Key = "bot.msg.common.button_conflict_skip"
|
||||
BotMsgCommonCancelButtonText Key = "bot.msg.common.cancel_button_text"
|
||||
BotMsgCommonErrorBuildDirSelectKeyboardFailed Key = "bot.msg.common.error_build_dir_select_keyboard_failed"
|
||||
BotMsgCommonErrorBuildStorageSelectKeyboardFailed Key = "bot.msg.common.error_build_storage_select_keyboard_failed"
|
||||
@@ -63,7 +66,10 @@ const (
|
||||
BotMsgCommonErrorTaskAddFailed Key = "bot.msg.common.error_task_add_failed"
|
||||
BotMsgCommonErrorTaskCreateFailed Key = "bot.msg.common.error_task_create_failed"
|
||||
BotMsgCommonErrorUpdateUserInfoFailed Key = "bot.msg.common.error_update_user_info_failed"
|
||||
BotMsgCommonInfoAllConflictFilesSkipped Key = "bot.msg.common.info_all_conflict_files_skipped"
|
||||
BotMsgCommonInfoBatchTasksAdded Key = "bot.msg.common.info_batch_tasks_added"
|
||||
BotMsgCommonInfoBatchTasksAddedWithSkipped Key = "bot.msg.common.info_batch_tasks_added_with_skipped"
|
||||
BotMsgCommonInfoConflictFilesSkipped Key = "bot.msg.common.info_conflict_files_skipped"
|
||||
BotMsgCommonInfoDefaultStorageSet Key = "bot.msg.common.info_default_storage_set"
|
||||
BotMsgCommonInfoDefaultStorageWithDirSet Key = "bot.msg.common.info_default_storage_with_dir_set"
|
||||
BotMsgCommonInfoFetchingFileInfo Key = "bot.msg.common.info_fetching_file_info"
|
||||
@@ -73,16 +79,25 @@ const (
|
||||
BotMsgCommonInfoSilentModeOff Key = "bot.msg.common.info_silent_mode_off"
|
||||
BotMsgCommonInfoSilentModeOn Key = "bot.msg.common.info_silent_mode_on"
|
||||
BotMsgCommonInfoTaskAdded Key = "bot.msg.common.info_task_added"
|
||||
BotMsgCommonPromptConflictMoreFiles Key = "bot.msg.common.prompt_conflict_more_files"
|
||||
BotMsgCommonPromptSelectConflictStrategy Key = "bot.msg.common.prompt_select_conflict_strategy"
|
||||
BotMsgCommonPromptSelectDefaultDir Key = "bot.msg.common.prompt_select_default_dir"
|
||||
BotMsgCommonPromptSelectDefaultStorage Key = "bot.msg.common.prompt_select_default_storage"
|
||||
BotMsgCommonPromptSelectDir Key = "bot.msg.common.prompt_select_dir"
|
||||
BotMsgConfigButtonFilenameStrategy Key = "bot.msg.config.button_filename_strategy"
|
||||
BotMsgConfigButtonConflictStrategy Key = "bot.msg.config.button_conflict_strategy"
|
||||
BotMsgConfigConflictStrategyAsk Key = "bot.msg.config.conflict_strategy_ask"
|
||||
BotMsgConfigConflictStrategyOverwrite Key = "bot.msg.config.conflict_strategy_overwrite"
|
||||
BotMsgConfigConflictStrategyRename Key = "bot.msg.config.conflict_strategy_rename"
|
||||
BotMsgConfigConflictStrategySkip Key = "bot.msg.config.conflict_strategy_skip"
|
||||
BotMsgConfigErrorInvalidCallbackData Key = "bot.msg.config.error_invalid_callback_data"
|
||||
BotMsgConfigErrorInvalidTemplate Key = "bot.msg.config.error_invalid_template"
|
||||
BotMsgConfigFnametmplHelp Key = "bot.msg.config.fnametmpl_help"
|
||||
BotMsgConfigInfoCurrentTemplatePrefix Key = "bot.msg.config.info_current_template_prefix"
|
||||
BotMsgConfigInfoConflictStrategySet Key = "bot.msg.config.info_conflict_strategy_set"
|
||||
BotMsgConfigInfoFilenameStrategySet Key = "bot.msg.config.info_filename_strategy_set"
|
||||
BotMsgConfigInfoTemplateUpdated Key = "bot.msg.config.info_template_updated"
|
||||
BotMsgConfigPromptSelectConflictStrategy Key = "bot.msg.config.prompt_select_conflict_strategy"
|
||||
BotMsgConfigPromptSelectFilenameStrategy Key = "bot.msg.config.prompt_select_filename_strategy"
|
||||
BotMsgConfigPromptSelectOption Key = "bot.msg.config.prompt_select_option"
|
||||
BotMsgDirButtonDefault Key = "bot.msg.dir.button_default"
|
||||
|
||||
@@ -112,9 +112,17 @@ bot:
|
||||
error_task_add_failed: "Failed to add task: {{.Error}}"
|
||||
info_task_added: "Task added"
|
||||
info_batch_tasks_added: "Batch tasks added, total {{.Count}} files"
|
||||
info_batch_tasks_added_with_skipped: "Batch tasks added, total {{.Count}} files\nSkipped conflicting files:\n{{.Skipped}}"
|
||||
info_all_conflict_files_skipped: "All conflicting files were skipped:\n{{.Skipped}}"
|
||||
info_conflict_files_skipped: "Skipped conflicting files:\n{{.Skipped}}"
|
||||
error_task_create_failed: "Failed to create task: {{.Error}}"
|
||||
error_get_dir_failed: "Failed to get directory: {{.Error}}"
|
||||
prompt_select_dir: "Please select a directory to store to"
|
||||
prompt_select_conflict_strategy: "Files with the same name already exist. Please select a save strategy:\n{{.Files}}"
|
||||
prompt_conflict_more_files: "...and {{.Count}} more files"
|
||||
button_conflict_rename: "Rename"
|
||||
button_conflict_overwrite: "Overwrite"
|
||||
button_conflict_skip: "Skip"
|
||||
prompt_select_default_dir: "Please select a default directory to save to"
|
||||
info_default_storage_set: "Default storage set to: {{.Name}}"
|
||||
info_default_storage_with_dir_set: "Default storage set to: {{.Name}}:/{{.Dir}}"
|
||||
@@ -266,10 +274,17 @@ bot:
|
||||
config:
|
||||
prompt_select_option: "Please select an option to configure"
|
||||
button_filename_strategy: "Filename strategy"
|
||||
button_conflict_strategy: "Duplicate file strategy"
|
||||
error_invalid_callback_data: "Invalid callback data"
|
||||
error_invalid_template: "Invalid template, please check syntax\n{{.Error}}"
|
||||
info_filename_strategy_set: "Filename strategy set to: {{.Strategy}}"
|
||||
info_conflict_strategy_set: "Duplicate file strategy set to: {{.Strategy}}"
|
||||
prompt_select_filename_strategy: "Please select filename strategy, current strategy: {{.Strategy}}"
|
||||
prompt_select_conflict_strategy: "Please select duplicate file strategy, current strategy: {{.Strategy}}"
|
||||
conflict_strategy_rename: "Always rename"
|
||||
conflict_strategy_ask: "Ask every time"
|
||||
conflict_strategy_overwrite: "Always overwrite"
|
||||
conflict_strategy_skip: "Always skip"
|
||||
fnametmpl_help: |-
|
||||
Use this command to set filename template, for example:
|
||||
/fnametmpl Image_{{"{{.msgid}}"}}_{{"{{.msgdate}}"}}.jpg
|
||||
|
||||
@@ -113,9 +113,17 @@ bot:
|
||||
error_task_add_failed: "任务添加失败: {{.Error}}"
|
||||
info_task_added: "任务已添加"
|
||||
info_batch_tasks_added: "已添加批量任务, 共 {{.Count}} 个文件"
|
||||
info_batch_tasks_added_with_skipped: "已添加批量任务, 共 {{.Count}} 个文件\n已跳过同名文件:\n{{.Skipped}}"
|
||||
info_all_conflict_files_skipped: "全部同名文件已跳过:\n{{.Skipped}}"
|
||||
info_conflict_files_skipped: "已跳过同名文件:\n{{.Skipped}}"
|
||||
error_task_create_failed: "任务创建失败: {{.Error}}"
|
||||
error_get_dir_failed: "获取目录失败: {{.Error}}"
|
||||
prompt_select_dir: "请选择要存储到的目录"
|
||||
prompt_select_conflict_strategy: "检测到同名文件, 请选择保存策略:\n{{.Files}}"
|
||||
prompt_conflict_more_files: "...还有 {{.Count}} 个文件"
|
||||
button_conflict_rename: "重命名"
|
||||
button_conflict_overwrite: "覆盖"
|
||||
button_conflict_skip: "跳过"
|
||||
prompt_select_default_dir: "请选择要保存到的默认文件夹"
|
||||
info_default_storage_set: "已将默认存储位置设为: {{.Name}}"
|
||||
info_default_storage_with_dir_set: "已将默认存储位置设为: {{.Name}}:/{{.Dir}}"
|
||||
@@ -267,10 +275,17 @@ bot:
|
||||
config:
|
||||
prompt_select_option: "请选择要配置的选项"
|
||||
button_filename_strategy: "文件名策略"
|
||||
button_conflict_strategy: "重名文件保存策略"
|
||||
error_invalid_callback_data: "无效的回调数据"
|
||||
error_invalid_template: "无效的模板, 请检查语法\n{{.Error}}"
|
||||
info_filename_strategy_set: "已将文件名策略设置为: {{.Strategy}}"
|
||||
info_conflict_strategy_set: "已将重名文件保存策略设置为: {{.Strategy}}"
|
||||
prompt_select_filename_strategy: "请选择文件名策略, 当前策略: {{.Strategy}}"
|
||||
prompt_select_conflict_strategy: "请选择重名文件保存策略, 当前策略: {{.Strategy}}"
|
||||
conflict_strategy_rename: "始终重命名"
|
||||
conflict_strategy_ask: "每次询问"
|
||||
conflict_strategy_overwrite: "始终覆盖"
|
||||
conflict_strategy_skip: "始终跳过"
|
||||
fnametmpl_help: |-
|
||||
使用该命令设置文件名模板, 示例:
|
||||
/fnametmpl 图片_{{"{{.msgid}}"}}_{{"{{.msgdate}}"}}.jpg
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -30,6 +31,7 @@ type Progress struct {
|
||||
ChatID int64
|
||||
start time.Time
|
||||
lastUpdatePercent atomic.Int32
|
||||
skippedFiles []string
|
||||
}
|
||||
|
||||
func (p *Progress) OnStart(ctx context.Context, info TaskInfo) {
|
||||
@@ -151,6 +153,14 @@ func (p *Progress) OnDone(ctx context.Context, info TaskInfo, err error) {
|
||||
styling.Code(strconv.Itoa(info.Count())),
|
||||
styling.Plain(i18n.T(i18nk.BotMsgProgressTotalSizePrefix, nil)),
|
||||
styling.Code(fmt.Sprintf("%.2f MB", float64(info.TotalSize())/(1024*1024))),
|
||||
func() styling.StyledTextOption {
|
||||
if len(p.skippedFiles) == 0 {
|
||||
return styling.Plain("")
|
||||
}
|
||||
return styling.Plain("\n\n" + i18n.T(i18nk.BotMsgCommonInfoConflictFilesSkipped, map[string]any{
|
||||
"Skipped": strings.Join(p.skippedFiles, "\n"),
|
||||
}))
|
||||
}(),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -173,8 +183,13 @@ func (p *Progress) OnDone(ctx context.Context, info TaskInfo, err error) {
|
||||
}
|
||||
|
||||
func NewProgressTracker(messageID int, chatID int64) ProgressTracker {
|
||||
return NewProgressTrackerWithSkipped(messageID, chatID, nil)
|
||||
}
|
||||
|
||||
func NewProgressTrackerWithSkipped(messageID int, chatID int64, skippedFiles []string) ProgressTracker {
|
||||
return &Progress{
|
||||
MessageID: messageID,
|
||||
ChatID: chatID,
|
||||
MessageID: messageID,
|
||||
ChatID: chatID,
|
||||
skippedFiles: skippedFiles,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ type User struct {
|
||||
WatchChats []WatchChat
|
||||
FilenameStrategy string
|
||||
FilenameTemplate string
|
||||
ConflictStrategy string
|
||||
}
|
||||
|
||||
type WatchChat struct {
|
||||
|
||||
2
go.mod
2
go.mod
@@ -117,7 +117,7 @@ require (
|
||||
github.com/dgraph-io/ristretto/v2 v2.4.0
|
||||
github.com/dop251/goja v0.0.0-20260311135729-065cd970411c
|
||||
github.com/duke-git/lancet/v2 v2.3.9
|
||||
github.com/fsnotify/fsnotify v1.9.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.9.0
|
||||
github.com/glebarez/sqlite v1.11.0
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/klauspost/compress v1.18.5 // indirect
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package ctxkey
|
||||
|
||||
// ENUM(content-length)
|
||||
// ENUM(content-length, overwrite-existing)
|
||||
//
|
||||
//go:generate go-enum --values --names --flag --nocase --noprefix
|
||||
type ContextKey string
|
||||
|
||||
@@ -14,12 +14,15 @@ import (
|
||||
const (
|
||||
// ContentLength is a ContextKey of type content-length.
|
||||
ContentLength ContextKey = "content-length"
|
||||
// OverwriteExisting is a ContextKey of type overwrite-existing.
|
||||
OverwriteExisting ContextKey = "overwrite-existing"
|
||||
)
|
||||
|
||||
var ErrInvalidContextKey = fmt.Errorf("not a valid ContextKey, try [%s]", strings.Join(_ContextKeyNames, ", "))
|
||||
|
||||
var _ContextKeyNames = []string{
|
||||
string(ContentLength),
|
||||
string(OverwriteExisting),
|
||||
}
|
||||
|
||||
// ContextKeyNames returns a list of possible string values of ContextKey.
|
||||
@@ -33,6 +36,7 @@ func ContextKeyNames() []string {
|
||||
func ContextKeyValues() []ContextKey {
|
||||
return []ContextKey{
|
||||
ContentLength,
|
||||
OverwriteExisting,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,7 +53,8 @@ func (x ContextKey) IsValid() bool {
|
||||
}
|
||||
|
||||
var _ContextKeyValue = map[string]ContextKey{
|
||||
"content-length": ContentLength,
|
||||
"content-length": ContentLength,
|
||||
"overwrite-existing": OverwriteExisting,
|
||||
}
|
||||
|
||||
// ParseContextKey attempts to convert a string to a ContextKey.
|
||||
|
||||
@@ -133,7 +133,7 @@ func (c *Client) Put(ctx context.Context, key string, r io.Reader, size int64) e
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode >= 300 {
|
||||
return fmt.Errorf("put object failed: %s", resp.Status)
|
||||
return responseError("put object", resp)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -170,10 +170,21 @@ func signRequest(req *http.Request, region, accessKey, secretKey string, payload
|
||||
req.Header.Set("x-amz-date", amzDate)
|
||||
req.Header.Set("x-amz-content-sha256", payloadHash)
|
||||
|
||||
// Canonical headers
|
||||
var headers []string
|
||||
// Canonical headers. Host is required by SigV4, but Go stores it on
|
||||
// Request.Host/URL.Host rather than in Request.Header.
|
||||
headerValues := map[string]string{
|
||||
"host": req.URL.Host,
|
||||
}
|
||||
if req.Host != "" {
|
||||
headerValues["host"] = req.Host
|
||||
}
|
||||
for k := range req.Header {
|
||||
headers = append(headers, strings.ToLower(k))
|
||||
headerValues[strings.ToLower(k)] = strings.TrimSpace(req.Header.Get(k))
|
||||
}
|
||||
|
||||
var headers []string
|
||||
for k := range headerValues {
|
||||
headers = append(headers, k)
|
||||
}
|
||||
sort.Strings(headers)
|
||||
|
||||
@@ -181,7 +192,7 @@ func signRequest(req *http.Request, region, accessKey, secretKey string, payload
|
||||
for _, k := range headers {
|
||||
canonicalHeaders.WriteString(k)
|
||||
canonicalHeaders.WriteString(":")
|
||||
canonicalHeaders.WriteString(strings.TrimSpace(req.Header.Get(k)))
|
||||
canonicalHeaders.WriteString(headerValues[k])
|
||||
canonicalHeaders.WriteString("\n")
|
||||
}
|
||||
|
||||
@@ -189,7 +200,7 @@ func signRequest(req *http.Request, region, accessKey, secretKey string, payload
|
||||
|
||||
canonicalRequest := strings.Join([]string{
|
||||
req.Method,
|
||||
req.URL.EscapedPath(),
|
||||
canonicalURI(req.URL.Path),
|
||||
req.URL.RawQuery,
|
||||
canonicalHeaders.String(),
|
||||
signedHeaders,
|
||||
@@ -219,3 +230,54 @@ func signRequest(req *http.Request, region, accessKey, secretKey string, payload
|
||||
req.Header.Set("Authorization", auth)
|
||||
return nil
|
||||
}
|
||||
|
||||
func responseError(operation string, resp *http.Response) error {
|
||||
body, err := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s failed: %s", operation, resp.Status)
|
||||
}
|
||||
|
||||
message := strings.TrimSpace(string(body))
|
||||
if message == "" {
|
||||
return fmt.Errorf("%s failed: %s", operation, resp.Status)
|
||||
}
|
||||
|
||||
return fmt.Errorf("%s failed: %s: %s", operation, resp.Status, message)
|
||||
}
|
||||
|
||||
func canonicalURI(path string) string {
|
||||
if path == "" {
|
||||
return "/"
|
||||
}
|
||||
|
||||
var b strings.Builder
|
||||
for i := 0; i < len(path); i++ {
|
||||
c := path[i]
|
||||
if shouldEscapePathByte(c) {
|
||||
b.WriteByte('%')
|
||||
b.WriteByte("0123456789ABCDEF"[c>>4])
|
||||
b.WriteByte("0123456789ABCDEF"[c&15])
|
||||
continue
|
||||
}
|
||||
b.WriteByte(c)
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
func shouldEscapePathByte(c byte) bool {
|
||||
if c >= 'A' && c <= 'Z' {
|
||||
return false
|
||||
}
|
||||
if c >= 'a' && c <= 'z' {
|
||||
return false
|
||||
}
|
||||
if c >= '0' && c <= '9' {
|
||||
return false
|
||||
}
|
||||
switch c {
|
||||
case '-', '.', '_', '~', '/':
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,31 @@ const (
|
||||
TypeCancel = "cancel"
|
||||
)
|
||||
|
||||
const (
|
||||
ConflictStrategyRename = "rename"
|
||||
ConflictStrategyAsk = "ask"
|
||||
ConflictStrategyOverwrite = "overwrite"
|
||||
ConflictStrategySkip = "skip"
|
||||
)
|
||||
|
||||
func ConflictStrategyValues() []string {
|
||||
return []string{
|
||||
ConflictStrategyRename,
|
||||
ConflictStrategyAsk,
|
||||
ConflictStrategyOverwrite,
|
||||
ConflictStrategySkip,
|
||||
}
|
||||
}
|
||||
|
||||
func IsConflictStrategy(strategy string) bool {
|
||||
for _, value := range ConflictStrategyValues() {
|
||||
if strategy == value {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// type TaskDataTGFiles struct {
|
||||
// Files []tfile.TGFileMessage
|
||||
// AsBatch bool
|
||||
@@ -34,6 +59,8 @@ type Add struct {
|
||||
SelectedStorName string
|
||||
DirID uint
|
||||
SettedDir bool
|
||||
SelectedDirPath string
|
||||
ConflictStrategy string
|
||||
// tfiles
|
||||
Files []tfile.TGFileMessage
|
||||
AsBatch bool
|
||||
|
||||
@@ -108,8 +108,10 @@ func (a *Alist) Save(ctx context.Context, reader io.Reader, storagePath string)
|
||||
ext := path.Ext(storagePath)
|
||||
base := strings.TrimSuffix(storagePath, ext)
|
||||
candidate := storagePath
|
||||
for i := 1; a.Exists(ctx, candidate); i++ {
|
||||
candidate = fmt.Sprintf("%s_%d%s", base, i, ext)
|
||||
if overwrite, _ := ctx.Value(ctxkey.OverwriteExisting).(bool); !overwrite {
|
||||
for i := 1; a.existsPath(ctx, candidate); i++ {
|
||||
candidate = fmt.Sprintf("%s_%d%s", base, i, ext)
|
||||
}
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, a.baseURL+"/api/fs/put", reader)
|
||||
@@ -158,6 +160,10 @@ func (a *Alist) JoinStoragePath(p string) string {
|
||||
}
|
||||
|
||||
func (a *Alist) Exists(ctx context.Context, storagePath string) bool {
|
||||
return a.existsPath(ctx, a.JoinStoragePath(storagePath))
|
||||
}
|
||||
|
||||
func (a *Alist) existsPath(ctx context.Context, storagePath string) bool {
|
||||
// POST /api/fs/get
|
||||
/*
|
||||
body:
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
package storage
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/krau/SaveAny-Bot/pkg/enums/ctxkey"
|
||||
)
|
||||
|
||||
type contextKey struct{}
|
||||
|
||||
@@ -20,3 +24,7 @@ func FromContext(ctx context.Context) Storage {
|
||||
}
|
||||
return storage
|
||||
}
|
||||
|
||||
func WithOverwrite(ctx context.Context) context.Context {
|
||||
return context.WithValue(ctx, ctxkey.OverwriteExisting, true)
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/charmbracelet/log"
|
||||
"github.com/duke-git/lancet/v2/fileutil"
|
||||
config "github.com/krau/SaveAny-Bot/config/storage"
|
||||
"github.com/krau/SaveAny-Bot/pkg/enums/ctxkey"
|
||||
storenum "github.com/krau/SaveAny-Bot/pkg/enums/storage"
|
||||
"github.com/krau/SaveAny-Bot/pkg/storagetypes"
|
||||
)
|
||||
@@ -56,8 +57,10 @@ func (l *Local) Save(ctx context.Context, r io.Reader, storagePath string) error
|
||||
ext := filepath.Ext(storagePath)
|
||||
base := strings.TrimSuffix(storagePath, ext)
|
||||
candidate := storagePath
|
||||
for i := 1; l.Exists(ctx, candidate); i++ {
|
||||
candidate = fmt.Sprintf("%s_%d%s", base, i, ext)
|
||||
if overwrite, _ := ctx.Value(ctxkey.OverwriteExisting).(bool); !overwrite {
|
||||
for i := 1; l.existsPath(candidate); i++ {
|
||||
candidate = fmt.Sprintf("%s_%d%s", base, i, ext)
|
||||
}
|
||||
}
|
||||
|
||||
absPath, err := filepath.Abs(candidate)
|
||||
@@ -77,6 +80,10 @@ func (l *Local) Save(ctx context.Context, r io.Reader, storagePath string) error
|
||||
}
|
||||
|
||||
func (l *Local) Exists(ctx context.Context, storagePath string) bool {
|
||||
return l.existsPath(l.JoinStoragePath(storagePath))
|
||||
}
|
||||
|
||||
func (l *Local) existsPath(storagePath string) bool {
|
||||
absPath, err := filepath.Abs(storagePath)
|
||||
if err != nil {
|
||||
return false
|
||||
|
||||
@@ -81,12 +81,14 @@ func (m *Minio) Save(ctx context.Context, r io.Reader, storagePath string) error
|
||||
ext := path.Ext(storagePath)
|
||||
base := strings.TrimSuffix(storagePath, ext)
|
||||
candidate := storagePath
|
||||
for i := 1; m.Exists(ctx, candidate); i++ {
|
||||
candidate = fmt.Sprintf("%s_%d%s", base, i, ext)
|
||||
if i > 10 {
|
||||
m.logger.Errorf("Too many attempts to find a unique filename for %s", storagePath)
|
||||
candidate = fmt.Sprintf("%s_%s%s", base, xid.New().String(), ext)
|
||||
break
|
||||
if overwrite, _ := ctx.Value(ctxkey.OverwriteExisting).(bool); !overwrite {
|
||||
for i := 1; m.existsObject(ctx, candidate); i++ {
|
||||
candidate = fmt.Sprintf("%s_%d%s", base, i, ext)
|
||||
if i > 10 {
|
||||
m.logger.Errorf("Too many attempts to find a unique filename for %s", storagePath)
|
||||
candidate = fmt.Sprintf("%s_%s%s", base, xid.New().String(), ext)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
size := int64(-1)
|
||||
@@ -106,6 +108,10 @@ func (m *Minio) Save(ctx context.Context, r io.Reader, storagePath string) error
|
||||
|
||||
func (m *Minio) Exists(ctx context.Context, storagePath string) bool {
|
||||
m.logger.Debugf("Checking if file exists at %s", storagePath)
|
||||
return m.existsObject(ctx, m.JoinStoragePath(storagePath))
|
||||
}
|
||||
|
||||
func (m *Minio) existsObject(ctx context.Context, storagePath string) bool {
|
||||
_, err := m.client.StatObject(ctx, m.config.BucketName, storagePath, minio.StatObjectOptions{})
|
||||
return err == nil
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/charmbracelet/log"
|
||||
config "github.com/krau/SaveAny-Bot/config/storage"
|
||||
"github.com/krau/SaveAny-Bot/pkg/enums/ctxkey"
|
||||
storenum "github.com/krau/SaveAny-Bot/pkg/enums/storage"
|
||||
"github.com/krau/SaveAny-Bot/pkg/storagetypes"
|
||||
"github.com/rs/xid"
|
||||
@@ -107,12 +108,14 @@ func (r *Rclone) Save(ctx context.Context, reader io.Reader, storagePath string)
|
||||
ext := path.Ext(storagePath)
|
||||
base := strings.TrimSuffix(storagePath, ext)
|
||||
candidate := storagePath
|
||||
for i := 1; r.Exists(ctx, candidate); i++ {
|
||||
candidate = fmt.Sprintf("%s_%d%s", base, i, ext)
|
||||
if i > 100 {
|
||||
r.logger.Errorf("Too many attempts to find a unique filename for %s", storagePath)
|
||||
candidate = fmt.Sprintf("%s_%s%s", base, xid.New().String(), ext)
|
||||
break
|
||||
if overwrite, _ := ctx.Value(ctxkey.OverwriteExisting).(bool); !overwrite {
|
||||
for i := 1; r.Exists(ctx, candidate); i++ {
|
||||
candidate = fmt.Sprintf("%s_%d%s", base, i, ext)
|
||||
if i > 100 {
|
||||
r.logger.Errorf("Too many attempts to find a unique filename for %s", storagePath)
|
||||
candidate = fmt.Sprintf("%s_%s%s", base, xid.New().String(), ext)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -70,13 +70,15 @@ func (m *S3) Save(ctx context.Context, r io.Reader, storagePath string) error {
|
||||
base := strings.TrimSuffix(storagePath, ext)
|
||||
candidate := storagePath
|
||||
|
||||
// Unique filename
|
||||
for i := 1; m.Exists(ctx, candidate); i++ {
|
||||
candidate = fmt.Sprintf("%s_%d%s", base, i, ext)
|
||||
if i > 10 {
|
||||
m.logger.Errorf("Too many attempts for unique filename: %s", storagePath)
|
||||
candidate = fmt.Sprintf("%s_%s%s", base, xid.New().String(), ext)
|
||||
break
|
||||
if overwrite, _ := ctx.Value(ctxkey.OverwriteExisting).(bool); !overwrite {
|
||||
// Unique filename
|
||||
for i := 1; m.existsKey(ctx, candidate); i++ {
|
||||
candidate = fmt.Sprintf("%s_%d%s", base, i, ext)
|
||||
if i > 10 {
|
||||
m.logger.Errorf("Too many attempts for unique filename: %s", storagePath)
|
||||
candidate = fmt.Sprintf("%s_%s%s", base, xid.New().String(), ext)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,5 +101,9 @@ func (m *S3) Save(ctx context.Context, r io.Reader, storagePath string) error {
|
||||
func (m *S3) Exists(ctx context.Context, storagePath string) bool {
|
||||
m.logger.Debugf("Checking if file exists at %s", storagePath)
|
||||
|
||||
return m.client.Exists(ctx, storagePath)
|
||||
return m.existsKey(ctx, m.JoinStoragePath(storagePath))
|
||||
}
|
||||
|
||||
func (m *S3) existsKey(ctx context.Context, key string) bool {
|
||||
return m.client.Exists(ctx, key)
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/charmbracelet/log"
|
||||
config "github.com/krau/SaveAny-Bot/config/storage"
|
||||
"github.com/krau/SaveAny-Bot/pkg/enums/ctxkey"
|
||||
storenum "github.com/krau/SaveAny-Bot/pkg/enums/storage"
|
||||
"github.com/krau/SaveAny-Bot/pkg/storagetypes"
|
||||
"github.com/rs/xid"
|
||||
@@ -57,12 +58,14 @@ func (w *Webdav) Save(ctx context.Context, r io.Reader, storagePath string) erro
|
||||
ext := path.Ext(storagePath)
|
||||
base := strings.TrimSuffix(storagePath, ext)
|
||||
candidate := storagePath
|
||||
for i := 1; w.Exists(ctx, candidate); i++ {
|
||||
candidate = fmt.Sprintf("%s_%d%s", base, i, ext)
|
||||
if i > 1000 {
|
||||
w.logger.Errorf("Too many attempts to find a unique filename for %s", storagePath)
|
||||
candidate = fmt.Sprintf("%s_%s%s", base, xid.New().String(), ext)
|
||||
break
|
||||
if overwrite, _ := ctx.Value(ctxkey.OverwriteExisting).(bool); !overwrite {
|
||||
for i := 1; w.existsPath(ctx, candidate); i++ {
|
||||
candidate = fmt.Sprintf("%s_%d%s", base, i, ext)
|
||||
if i > 1000 {
|
||||
w.logger.Errorf("Too many attempts to find a unique filename for %s", storagePath)
|
||||
candidate = fmt.Sprintf("%s_%s%s", base, xid.New().String(), ext)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,6 +82,10 @@ func (w *Webdav) Save(ctx context.Context, r io.Reader, storagePath string) erro
|
||||
|
||||
func (w *Webdav) Exists(ctx context.Context, storagePath string) bool {
|
||||
w.logger.Debugf("Checking if file exists at %s", storagePath)
|
||||
return w.existsPath(ctx, w.JoinStoragePath(storagePath))
|
||||
}
|
||||
|
||||
func (w *Webdav) existsPath(ctx context.Context, storagePath string) bool {
|
||||
exists, err := w.client.Exists(ctx, storagePath)
|
||||
if err != nil {
|
||||
w.logger.Errorf("Failed to check if file exists at %s: %v", storagePath, err)
|
||||
|
||||
Reference in New Issue
Block a user