From 27fe1ebe497bc15b39bb6336081d382ff73af88b Mon Sep 17 00:00:00 2001 From: krau <71133316+krau@users.noreply.github.com> Date: Thu, 25 Jun 2026 14:31:39 +0800 Subject: [PATCH] feat: add directory watching and auto-upload functionality --- cmd/root.go | 2 + cmd/watch/cmd.go | 145 +++++++++++++++++++++++ cmd/watch/uploader.go | 227 +++++++++++++++++++++++++++++++++++ cmd/watch/watcher.go | 269 ++++++++++++++++++++++++++++++++++++++++++ go.mod | 2 +- 5 files changed, 644 insertions(+), 1 deletion(-) create mode 100644 cmd/watch/cmd.go create mode 100644 cmd/watch/uploader.go create mode 100644 cmd/watch/watcher.go diff --git a/cmd/root.go b/cmd/root.go index e11f616..35bb644 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/krau/SaveAny-Bot/cmd/upload" + "github.com/krau/SaveAny-Bot/cmd/watch" "github.com/krau/SaveAny-Bot/config" "github.com/spf13/cobra" ) @@ -18,6 +19,7 @@ var rootCmd = &cobra.Command{ func init() { config.RegisterFlags(rootCmd) upload.Register(rootCmd) + watch.Register(rootCmd) } func Execute(ctx context.Context) { diff --git a/cmd/watch/cmd.go b/cmd/watch/cmd.go new file mode 100644 index 0000000..aee8b88 --- /dev/null +++ b/cmd/watch/cmd.go @@ -0,0 +1,145 @@ +package watch + +import ( + "fmt" + "time" + + "github.com/charmbracelet/log" + "github.com/krau/SaveAny-Bot/client/bot" + "github.com/krau/SaveAny-Bot/common/cache" + "github.com/krau/SaveAny-Bot/common/i18n" + "github.com/krau/SaveAny-Bot/common/utils/tgutil" + "github.com/krau/SaveAny-Bot/config" + "github.com/krau/SaveAny-Bot/database" + stortype "github.com/krau/SaveAny-Bot/pkg/enums/storage" + "github.com/krau/SaveAny-Bot/storage" + "github.com/spf13/cobra" +) + +var watchCmd = &cobra.Command{ + Use: "watch", + Short: "watch a local directory and auto-upload changed files to storage", + Long: `Watch a local directory and automatically upload created or modified files +to the specified storage backend, preserving the relative directory structure. + +Example: + saveany-bot watch -p /data/inbox -s mystorage -d backup --recursive`, + RunE: runWatch, +} + +func Register(root *cobra.Command) { + flags := watchCmd.Flags() + flags.StringP("path", "p", "", "local directory path to watch") + watchCmd.MarkFlagRequired("path") + flags.StringP("storage", "s", "", "storage name to upload to") + watchCmd.MarkFlagRequired("storage") + flags.StringP("dir", "d", "", "storage dir to upload to, default is the base_path of the storage") + flags.BoolP("recursive", "r", false, "watch subdirectories recursively") + flags.Bool("overwrite", false, "overwrite existing files on storage instead of skipping") + flags.Bool("initial-scan", false, "upload existing files in the directory on startup") + flags.Duration("debounce", 2*time.Second, "wait time after the last change before uploading a file") + flags.Int("upload-workers", 0, "number of concurrent uploads, default is config.workers") + flags.Duration("retry-delay", 3*time.Second, "delay between upload retries") + root.AddCommand(watchCmd) +} + +func runWatch(cmd *cobra.Command, _ []string) error { + watchPath, err := cmd.Flags().GetString("path") + if err != nil { + return err + } + storName, err := cmd.Flags().GetString("storage") + if err != nil { + return err + } + destDir, err := cmd.Flags().GetString("dir") + if err != nil { + return err + } + recursive, err := cmd.Flags().GetBool("recursive") + if err != nil { + return err + } + overwrite, err := cmd.Flags().GetBool("overwrite") + if err != nil { + return err + } + initialScan, err := cmd.Flags().GetBool("initial-scan") + if err != nil { + return err + } + debounce, err := cmd.Flags().GetDuration("debounce") + if err != nil { + return err + } + uploadWorkers, err := cmd.Flags().GetInt("upload-workers") + if err != nil { + return err + } + retryDelay, err := cmd.Flags().GetDuration("retry-delay") + if err != nil { + return err + } + + ctx := cmd.Context() + logger := log.FromContext(ctx) + + configFile := config.GetConfigFile(cmd) + if err := config.Init(ctx, configFile); err != nil { + return fmt.Errorf("failed to load config: %w", err) + } + i18n.Init(config.C().Lang) + cache.Init() + database.Init(ctx) + + stor, err := storage.GetStorageByName(ctx, storName) + if err != nil { + return fmt.Errorf("failed to get storage %q: %w", storName, err) + } + + // Telegram storage needs the bot client and its ext context injected into ctx. + if stor.Type() == stortype.Telegram { + bot.Init(ctx) + ctx = tgutil.ExtWithContext(ctx, bot.ExtContext()) + } + + if uploadWorkers < 1 { + uploadWorkers = config.C().Workers + } + + uploader := NewUploader(ctx, UploaderOptions{ + Storage: stor, + DestDir: destDir, + Overwrite: overwrite, + Workers: uploadWorkers, + Retry: config.C().Retry, + RetryDelay: retryDelay, + }) + + watcher, err := NewWatcher(ctx, WatcherOptions{ + Root: watchPath, + Recursive: recursive, + Debounce: debounce, + Uploader: uploader, + }) + if err != nil { + uploader.Close() + return fmt.Errorf("failed to create watcher: %w", err) + } + + if initialScan { + watcher.ScanExisting(ctx) + } + + logger.Infof("watch started: %s -> storage %q dir %q", watchPath, storName, destDir) + + // Run blocks until ctx is cancelled (e.g. SIGINT). + runErr := watcher.Run(ctx) + + // Wait for in-flight uploads to finish before exiting. + logger.Info("waiting for in-flight uploads to finish...") + uploader.Close() + logger.Info("watch stopped") + + return runErr +} diff --git a/cmd/watch/uploader.go b/cmd/watch/uploader.go new file mode 100644 index 0000000..66c06c1 --- /dev/null +++ b/cmd/watch/uploader.go @@ -0,0 +1,227 @@ +package watch + +import ( + "context" + "fmt" + "os" + "path" + "path/filepath" + "sync" + "time" + + "github.com/charmbracelet/log" + "github.com/krau/SaveAny-Bot/pkg/enums/ctxkey" + "github.com/krau/SaveAny-Bot/storage" +) + +type uploadJob struct { + // localPath is the absolute path of the local file. + localPath string + // relPath is relative to the watch root, used to preserve directory structure on storage. + relPath string +} + +// Uploader uploads local files to the target storage via a worker pool. +// If a file changes while being uploaded, it is re-uploaded once after the +// current upload finishes, instead of being queued multiple times. +type Uploader struct { + stor storage.Storage + destDir string + overwrite bool + retry int + retryDelay time.Duration + logger *log.Logger + + jobs chan uploadJob + wg sync.WaitGroup + + mu sync.Mutex + // inflight maps in-progress (or queued) file paths. A true value means the + // file changed during upload and must be re-queued once done. + inflight map[string]bool +} + +type UploaderOptions struct { + Storage storage.Storage + DestDir string + Overwrite bool + Workers int + Retry int + RetryDelay time.Duration + QueueSize int +} + +// NewUploader creates and starts an Uploader. The caller must call Close when done. +func NewUploader(ctx context.Context, opts UploaderOptions) *Uploader { + if opts.Workers < 1 { + opts.Workers = 1 + } + if opts.Retry < 1 { + opts.Retry = 1 + } + if opts.RetryDelay <= 0 { + opts.RetryDelay = 3 * time.Second + } + if opts.QueueSize < opts.Workers { + opts.QueueSize = opts.Workers * 64 + } + + u := &Uploader{ + stor: opts.Storage, + destDir: opts.DestDir, + overwrite: opts.Overwrite, + retry: opts.Retry, + retryDelay: opts.RetryDelay, + logger: log.FromContext(ctx).WithPrefix("uploader"), + jobs: make(chan uploadJob, opts.QueueSize), + inflight: make(map[string]bool), + } + + for i := 0; i < opts.Workers; i++ { + u.wg.Add(1) + go u.worker(ctx) + } + + return u +} + +// Submit enqueues an upload job. If the same file is already in flight, it is +// marked for re-upload instead of being queued again. Returns false if ctx is +// cancelled before the job can be enqueued. +func (u *Uploader) Submit(ctx context.Context, job uploadJob) bool { + u.mu.Lock() + if _, ok := u.inflight[job.localPath]; ok { + u.inflight[job.localPath] = true + u.mu.Unlock() + u.logger.Debugf("file %s already in flight, marked for re-upload", job.localPath) + return true + } + u.inflight[job.localPath] = false + u.mu.Unlock() + + select { + case u.jobs <- job: + return true + case <-ctx.Done(): + u.mu.Lock() + delete(u.inflight, job.localPath) + u.mu.Unlock() + return false + } +} + +func (u *Uploader) worker(ctx context.Context) { + defer u.wg.Done() + for { + select { + case <-ctx.Done(): + return + case job, ok := <-u.jobs: + if !ok { + return + } + u.process(ctx, job) + } + } +} + +func (u *Uploader) process(ctx context.Context, job uploadJob) { + if err := u.uploadWithRetry(ctx, job); err != nil { + if ctx.Err() != nil { + u.clearInflight(job.localPath) + return + } + u.logger.Errorf("failed to upload %s after %d attempt(s): %v", job.localPath, u.retry, err) + } + + // Re-queue if the file changed again while it was being uploaded. + u.mu.Lock() + needReupload := u.inflight[job.localPath] + if needReupload { + u.inflight[job.localPath] = false + } else { + delete(u.inflight, job.localPath) + } + u.mu.Unlock() + + if needReupload { + select { + case u.jobs <- job: + u.logger.Debugf("re-queued %s due to changes during upload", job.localPath) + case <-ctx.Done(): + u.clearInflight(job.localPath) + } + } +} + +func (u *Uploader) clearInflight(localPath string) { + u.mu.Lock() + delete(u.inflight, localPath) + u.mu.Unlock() +} + +func (u *Uploader) uploadWithRetry(ctx context.Context, job uploadJob) error { + var lastErr error + for attempt := 1; attempt <= u.retry; attempt++ { + if ctx.Err() != nil { + return ctx.Err() + } + err := u.upload(ctx, job) + if err == nil { + return nil + } + if ctx.Err() != nil { + return ctx.Err() + } + lastErr = err + u.logger.Warnf("upload %s failed (attempt %d/%d): %v", job.localPath, attempt, u.retry, err) + if attempt < u.retry { + select { + case <-time.After(u.retryDelay): + case <-ctx.Done(): + return ctx.Err() + } + } + } + return lastErr +} + +func (u *Uploader) upload(ctx context.Context, job uploadJob) error { + file, err := os.Open(filepath.Clean(job.localPath)) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + info, err := file.Stat() + if err != nil { + return fmt.Errorf("failed to stat file: %w", err) + } + if info.IsDir() { + return fmt.Errorf("path is a directory, not a file") + } + + // Keep the relative directory structure on the storage side. + storagePath := path.Join(u.destDir, filepath.ToSlash(job.relPath)) + + uploadCtx := context.WithValue(ctx, ctxkey.ContentLength, info.Size()) + if u.overwrite { + uploadCtx = storage.WithOverwrite(uploadCtx) + } else if u.stor.Exists(uploadCtx, storagePath) { + u.logger.Infof("skip existing file: %s", storagePath) + return nil + } + + u.logger.Infof("uploading %s -> %s (%d bytes)", job.localPath, storagePath, info.Size()) + if err := u.stor.Save(uploadCtx, file, storagePath); err != nil { + return fmt.Errorf("failed to save to storage: %w", err) + } + u.logger.Infof("uploaded %s", storagePath) + return nil +} + +// Close stops accepting jobs and waits for in-flight uploads to finish. +func (u *Uploader) Close() { + close(u.jobs) + u.wg.Wait() +} diff --git a/cmd/watch/watcher.go b/cmd/watch/watcher.go new file mode 100644 index 0000000..6921f38 --- /dev/null +++ b/cmd/watch/watcher.go @@ -0,0 +1,269 @@ +package watch + +import ( + "context" + "fmt" + "io/fs" + "os" + "path/filepath" + "sync" + "time" + + "github.com/charmbracelet/log" + "github.com/fsnotify/fsnotify" +) + +// Watcher watches a local directory and submits stable files to the Uploader. +// +// Write-completion detection: fsnotify emits Write events throughout a write. +// Watcher debounces per file and only uploads once the file size stays +// unchanged across a debounce window, avoiding uploads of partial files. +type Watcher struct { + root string + recursive bool + debounce time.Duration + uploader *Uploader + logger *log.Logger + + fsw *fsnotify.Watcher + + mu sync.Mutex + pending map[string]*time.Timer + // lastSize is the last observed file size, used to detect a stable write. + lastSize map[string]int64 +} + +type WatcherOptions struct { + Root string + Recursive bool + Debounce time.Duration + Uploader *Uploader +} + +// NewWatcher creates a Watcher. +func NewWatcher(ctx context.Context, opts WatcherOptions) (*Watcher, error) { + if opts.Debounce <= 0 { + opts.Debounce = 2 * time.Second + } + root, err := filepath.Abs(opts.Root) + if err != nil { + return nil, fmt.Errorf("failed to resolve root path: %w", err) + } + info, err := os.Stat(root) + if err != nil { + return nil, fmt.Errorf("failed to stat root path: %w", err) + } + if !info.IsDir() { + return nil, fmt.Errorf("watch path must be a directory: %s", root) + } + + fsw, err := fsnotify.NewWatcher() + if err != nil { + return nil, fmt.Errorf("failed to create fsnotify watcher: %w", err) + } + + w := &Watcher{ + root: root, + recursive: opts.Recursive, + debounce: opts.Debounce, + uploader: opts.Uploader, + logger: log.FromContext(ctx).WithPrefix("watcher"), + fsw: fsw, + pending: make(map[string]*time.Timer), + lastSize: make(map[string]int64), + } + return w, nil +} + +// Run starts watching and blocks until ctx is cancelled. +func (w *Watcher) Run(ctx context.Context) error { + if err := w.addDir(w.root); err != nil { + w.fsw.Close() + return fmt.Errorf("failed to watch root: %w", err) + } + w.logger.Infof("watching %s (recursive=%v, debounce=%s)", w.root, w.recursive, w.debounce) + + defer w.cleanup() + + for { + select { + case <-ctx.Done(): + w.logger.Info("stopping watcher") + return nil + case event, ok := <-w.fsw.Events: + if !ok { + return nil + } + w.handleEvent(ctx, event) + case err, ok := <-w.fsw.Errors: + if !ok { + return nil + } + w.logger.Errorf("watch error: %v", err) + } + } +} + +func (w *Watcher) handleEvent(ctx context.Context, event fsnotify.Event) { + // Remove/Rename: cancel any pending upload for this path. + if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) { + w.cancelPending(event.Name) + return + } + + if !event.Has(fsnotify.Create) && !event.Has(fsnotify.Write) { + return + } + + info, err := os.Stat(event.Name) + if err != nil { + // File may have been removed or moved; ignore. + return + } + + if info.IsDir() { + // New directory: watch it recursively and scan files already inside. + if event.Has(fsnotify.Create) && w.recursive { + if err := w.addDir(event.Name); err != nil { + w.logger.Errorf("failed to watch new dir %s: %v", event.Name, err) + } + w.scanExisting(ctx, event.Name) + } + return + } + + w.scheduleUpload(ctx, event.Name) +} + +// scheduleUpload schedules a debounced upload for a file. +func (w *Watcher) scheduleUpload(ctx context.Context, file string) { + w.mu.Lock() + defer w.mu.Unlock() + + if t, ok := w.pending[file]; ok { + t.Stop() + } + w.pending[file] = time.AfterFunc(w.debounce, func() { + w.maybeUpload(ctx, file) + }) +} + +// maybeUpload submits the upload once the debounce window passes and the file +// size is stable; otherwise it waits another window. +func (w *Watcher) maybeUpload(ctx context.Context, file string) { + if ctx.Err() != nil { + return + } + + info, err := os.Stat(file) + if err != nil { + w.cancelPending(file) + return + } + if info.IsDir() { + w.cancelPending(file) + return + } + + w.mu.Lock() + prevSize, seen := w.lastSize[file] + curSize := info.Size() + if !seen || prevSize != curSize { + // Size still changing: likely still being written, wait another window. + w.lastSize[file] = curSize + w.pending[file] = time.AfterFunc(w.debounce, func() { + w.maybeUpload(ctx, file) + }) + w.mu.Unlock() + return + } + // Size stable: treat write as complete. + delete(w.pending, file) + delete(w.lastSize, file) + w.mu.Unlock() + + relPath, err := filepath.Rel(w.root, file) + if err != nil { + w.logger.Errorf("failed to compute relative path for %s: %v", file, err) + return + } + + w.uploader.Submit(ctx, uploadJob{localPath: file, relPath: relPath}) +} + +func (w *Watcher) cancelPending(file string) { + w.mu.Lock() + defer w.mu.Unlock() + if t, ok := w.pending[file]; ok { + t.Stop() + delete(w.pending, file) + } + delete(w.lastSize, file) +} + +// addDir adds a directory to the watch list, recursively when enabled. +func (w *Watcher) addDir(dir string) error { + if !w.recursive { + return w.fsw.Add(dir) + } + return filepath.WalkDir(dir, func(p string, d fs.DirEntry, err error) error { + if err != nil { + w.logger.Warnf("skip path %s: %v", p, err) + return nil + } + if d.IsDir() { + if addErr := w.fsw.Add(p); addErr != nil { + w.logger.Warnf("failed to watch dir %s: %v", p, addErr) + } else { + w.logger.Debugf("watching dir %s", p) + } + } + return nil + }) +} + +// scanExisting submits files already present under dir (initial sync and new-dir backfill). +func (w *Watcher) scanExisting(ctx context.Context, dir string) { + walkFn := func(p string, d fs.DirEntry, err error) error { + if err != nil { + w.logger.Warnf("skip path %s: %v", p, err) + return nil + } + if ctx.Err() != nil { + return ctx.Err() + } + if d.IsDir() { + if !w.recursive && p != dir { + return fs.SkipDir + } + return nil + } + relPath, relErr := filepath.Rel(w.root, p) + if relErr != nil { + w.logger.Errorf("failed to compute relative path for %s: %v", p, relErr) + return nil + } + w.uploader.Submit(ctx, uploadJob{localPath: p, relPath: relPath}) + return nil + } + if err := filepath.WalkDir(dir, walkFn); err != nil && ctx.Err() == nil { + w.logger.Errorf("failed to scan dir %s: %v", dir, err) + } +} + +// ScanExisting triggers a one-time scan and upload of existing files under the watch root. +func (w *Watcher) ScanExisting(ctx context.Context) { + w.logger.Info("scanning existing files for initial sync") + w.scanExisting(ctx, w.root) +} + +func (w *Watcher) cleanup() { + w.mu.Lock() + for _, t := range w.pending { + t.Stop() + } + w.pending = make(map[string]*time.Timer) + w.lastSize = make(map[string]int64) + w.mu.Unlock() + w.fsw.Close() +} diff --git a/go.mod b/go.mod index 4627c58..9f6c373 100644 --- a/go.mod +++ b/go.mod @@ -117,7 +117,7 @@ require ( github.com/dgraph-io/ristretto/v2 v2.4.0 github.com/dop251/goja v0.0.0-20260311135729-065cd970411c github.com/duke-git/lancet/v2 v2.3.9 - github.com/fsnotify/fsnotify v1.9.0 // indirect + github.com/fsnotify/fsnotify v1.9.0 github.com/glebarez/sqlite v1.11.0 github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/klauspost/compress v1.18.5 // indirect