From bc3c841d1d4683b710e249a8792396396c5d1c7c Mon Sep 17 00:00:00 2001 From: krau <71133316+krau@users.noreply.github.com> Date: Sat, 17 Jan 2026 15:19:49 +0800 Subject: [PATCH] fix: update aria2 configuration to include KeepFile option and improve download handling --- config/viper.go | 8 +- core/tasks/aria2dl/execute.go | 297 ++++++++++++++++------------------ 2 files changed, 142 insertions(+), 163 deletions(-) diff --git a/config/viper.go b/config/viper.go index 09a32e9..c7756da 100644 --- a/config/viper.go +++ b/config/viper.go @@ -36,10 +36,10 @@ type Config struct { } type aria2Config struct { - Enable bool `toml:"enable" mapstructure:"enable" json:"enable"` - Url string `toml:"url" mapstructure:"url" json:"url"` - Secret string `toml:"secret" mapstructure:"secret" json:"secret"` - RemoveAfterTransfer bool `toml:"remove_after_transfer" mapstructure:"remove_after_transfer" json:"remove_after_transfer"` + Enable bool `toml:"enable" mapstructure:"enable" json:"enable"` + Url string `toml:"url" mapstructure:"url" json:"url"` + Secret string `toml:"secret" mapstructure:"secret" json:"secret"` + KeepFile bool `toml:"keep_file" mapstructure:"keep_file" json:"keep_file"` } var cfg = &Config{} diff --git a/core/tasks/aria2dl/execute.go b/core/tasks/aria2dl/execute.go index 74e8520..fa33d2f 100644 --- a/core/tasks/aria2dl/execute.go +++ b/core/tasks/aria2dl/execute.go @@ -23,58 +23,51 @@ func (t *Task) Execute(ctx context.Context) error { t.Progress.OnStart(ctx, t) } + // Wait for aria2 download to complete + if err := t.waitForDownload(ctx); err != nil { + logger.Errorf("Aria2 download failed: %v", err) + if t.Progress != nil { + t.Progress.OnDone(ctx, t, err) + } + return err + } + + // Transfer downloaded files to storage + if err := t.transferFiles(ctx); err != nil { + logger.Errorf("File transfer failed: %v", err) + if t.Progress != nil { + t.Progress.OnDone(ctx, t, err) + } + return err + } + + logger.Infof("Aria2 task %s completed successfully", t.ID) + if t.Progress != nil { + t.Progress.OnDone(ctx, t, nil) + } + + // Clean up aria2 download result + if _, err := t.Aria2Client.RemoveDownloadResult(ctx, t.GID); err != nil { + logger.Warnf("Failed to remove aria2 download result: %v", err) + } + + return nil +} + +// waitForDownload waits for aria2 to complete the download +func (t *Task) waitForDownload(ctx context.Context) error { + logger := log.FromContext(ctx) ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() - var status *aria2.Status - var err error - for { select { case <-ctx.Done(): - logger.Warn("Aria2 task canceled") - if t.Progress != nil { - t.Progress.OnDone(ctx, t, ctx.Err()) - } return ctx.Err() case <-ticker.C: - // Try to get status from active/waiting queue first - status, err = t.Aria2Client.TellStatus(ctx, t.GID) + status, err := t.getStatus(ctx) if err != nil { - // If GID not found in active queue, check stopped queue - logger.Debugf("Task not in active queue, checking stopped queue: %v", err) - stoppedTasks, stopErr := t.Aria2Client.TellStopped(ctx, -1, 100) - if stopErr != nil { - logger.Errorf("Failed to get stopped tasks: %v", stopErr) - if t.Progress != nil { - t.Progress.OnDone(ctx, t, err) - } - return fmt.Errorf("failed to get aria2 status: %w", err) - } - - // Find our task in stopped queue - found := false - for _, task := range stoppedTasks { - if task.GID == t.GID { - status = &task - found = true - logger.Debugf("Found task in stopped queue with status: %s", status.Status) - break - } - } - - if !found { - logger.Errorf("Task GID %s not found in active or stopped queue", t.GID) - if t.Progress != nil { - t.Progress.OnDone(ctx, t, err) - } - return fmt.Errorf("aria2 task not found: %w", err) - } - } - - if status.Status != "active" && status.Status != "waiting" { - logger.Debugf("Aria2 GID %s status: %s, completed: %s/%s", - t.GID, status.Status, status.CompletedLength, status.TotalLength) + return err } if t.Progress != nil { @@ -83,163 +76,149 @@ func (t *Task) Execute(ctx context.Context) error { // Check if download is complete if status.IsDownloadComplete() { - // Check if this is a metadata download (torrent/magnet) that spawned follow-up downloads + // Handle metadata downloads (torrent/magnet) that spawn follow-up downloads if len(status.FollowedBy) > 0 { - logger.Infof("Aria2 GID %s completed and spawned follow-up downloads: %v", t.GID, status.FollowedBy) - logger.Infof("Switching to follow-up download GID: %s", status.FollowedBy[0]) - // Update to the follow-up GID (usually the actual file download) + logger.Infof("Switching from metadata GID %s to actual download GID: %s", t.GID, status.FollowedBy[0]) t.GID = status.FollowedBy[0] - // Continue monitoring the new GID continue } - logger.Infof("Aria2 download completed for GID %s", t.GID) - goto TransferFiles + logger.Infof("Download completed for GID %s", t.GID) + return nil } // Check for errors if status.IsDownloadError() { - err := fmt.Errorf("aria2 download error: %s (code: %s)", status.ErrorMessage, status.ErrorCode) - logger.Errorf("Aria2 download failed: %v", err) - if t.Progress != nil { - t.Progress.OnDone(ctx, t, err) - } - return err + return fmt.Errorf("aria2 download error: %s (code: %s)", status.ErrorMessage, status.ErrorCode) } - // Check if removed if status.IsDownloadRemoved() { - err := errors.New("aria2 download was removed") - logger.Error("Aria2 download was removed") - if t.Progress != nil { - t.Progress.OnDone(ctx, t, err) - } - return err + return errors.New("aria2 download was removed") } } } +} -TransferFiles: - // Get final status to get file list - status, err = t.Aria2Client.TellStatus(ctx, t.GID) - if err != nil { - logger.Errorf("Failed to get final status: %v", err) - if t.Progress != nil { - t.Progress.OnDone(ctx, t, err) +// getStatus retrieves the current status of the download +func (t *Task) getStatus(ctx context.Context) (*aria2.Status, error) { + logger := log.FromContext(ctx) + + // Try active/waiting queue first + status, err := t.Aria2Client.TellStatus(ctx, t.GID) + if err == nil { + return status, nil + } + + // Check stopped queue + logger.Debugf("Task not in active queue, checking stopped queue") + stoppedTasks, stopErr := t.Aria2Client.TellStopped(ctx, -1, 100) + if stopErr != nil { + return nil, fmt.Errorf("failed to get aria2 status: %w", err) + } + + for _, task := range stoppedTasks { + if task.GID == t.GID { + logger.Debugf("Found task in stopped queue with status: %s", task.Status) + return &task, nil } + } + + return nil, fmt.Errorf("task GID %s not found: %w", t.GID, err) +} + +// transferFiles transfers downloaded files from aria2 to storage +func (t *Task) transferFiles(ctx context.Context) error { + logger := log.FromContext(ctx) + + status, err := t.Aria2Client.TellStatus(ctx, t.GID) + if err != nil { return fmt.Errorf("failed to get final status: %w", err) } if len(status.Files) == 0 { - err := errors.New("no files in aria2 download") - logger.Error("No files in aria2 download") - if t.Progress != nil { - t.Progress.OnDone(ctx, t, err) - } - return err + return errors.New("no files in aria2 download") } - // Transfer files to storage logger.Infof("Transferring %d file(s) to storage %s", len(status.Files), t.Storage.Name()) transferredCount := 0 + for _, file := range status.Files { if file.Selected != "true" { logger.Debugf("Skipping unselected file: %s", file.Path) continue } - // Skip torrent files (they are metadata, not the actual content) fileName := filepath.Base(file.Path) + + // Skip torrent metadata files if filepath.Ext(fileName) == ".torrent" { - logger.Debugf("Skipping torrent metadata file: %s", file.Path) - // Still remove it if configured - if config.C().Aria2.RemoveAfterTransfer { - if err := os.Remove(file.Path); err != nil { - logger.Warnf("Failed to remove torrent file %s: %v", file.Path, err) - } else { - logger.Debugf("Removed torrent file %s", file.Path) - } - } + logger.Debugf("Skipping torrent metadata file: %s", fileName) + t.removeFileIfNeeded(file.Path) continue } - // Check if file exists - if _, err := os.Stat(file.Path); os.IsNotExist(err) { - logger.Warnf("Downloaded file not found: %s", file.Path) - continue + if err := t.transferFile(ctx, file.Path); err != nil { + return err } - - // Open file - f, err := os.Open(file.Path) - if err != nil { - logger.Errorf("Failed to open file %s: %v", file.Path, err) - if t.Progress != nil { - t.Progress.OnDone(ctx, t, err) - } - return fmt.Errorf("failed to open file %s: %w", file.Path, err) - } - defer f.Close() - - // Get file info - fileInfo, err := f.Stat() - if err != nil { - logger.Errorf("Failed to stat file %s: %v", file.Path, err) - if t.Progress != nil { - t.Progress.OnDone(ctx, t, err) - } - return fmt.Errorf("failed to stat file %s: %w", file.Path, err) - } - - // Set content length in context for storage - ctx = context.WithValue(ctx, ctxkey.ContentLength, fileInfo.Size()) - - // Determine destination path - fileName = filepath.Base(file.Path) - destPath := filepath.Join(t.StorPath, fileName) - - logger.Infof("Transferring file %s to %s:%s", fileName, t.Storage.Name(), destPath) - - // Save to storage - err = t.Storage.Save(ctx, f, destPath) - if err != nil { - logger.Errorf("Failed to save file %s to storage: %v", fileName, err) - if t.Progress != nil { - t.Progress.OnDone(ctx, t, err) - } - return fmt.Errorf("failed to save file %s to storage: %w", fileName, err) - } - - logger.Infof("Successfully transferred file %s", fileName) + transferredCount++ - - // Optionally remove the local file after successful transfer - if config.C().Aria2.RemoveAfterTransfer { - if err := os.Remove(file.Path); err != nil { - logger.Warnf("Failed to remove local file %s: %v", file.Path, err) - } else { - logger.Debugf("Removed local file %s", file.Path) - } - } + t.removeFileIfNeeded(file.Path) } if transferredCount == 0 { - err := errors.New("no files were transferred") - logger.Error("No files were transferred to storage") - if t.Progress != nil { - t.Progress.OnDone(ctx, t, err) - } - return err - } - - logger.Infof("Aria2 task %s completed successfully, transferred %d file(s)", t.ID, transferredCount) - if t.Progress != nil { - t.Progress.OnDone(ctx, t, nil) - } - - // Clean up aria2 download result - _, err = t.Aria2Client.RemoveDownloadResult(ctx, t.GID) - if err != nil { - logger.Warnf("Failed to remove aria2 download result: %v", err) + return errors.New("no files were transferred") } return nil } + +// transferFile transfers a single file to storage +func (t *Task) transferFile(ctx context.Context, filePath string) error { + logger := log.FromContext(ctx) + + // Check if file exists + fileInfo, err := os.Stat(filePath) + if err != nil { + if os.IsNotExist(err) { + logger.Warnf("Downloaded file not found: %s", filePath) + return nil // Not a fatal error, continue with other files + } + return fmt.Errorf("failed to stat file %s: %w", filePath, err) + } + + // Open file + f, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open file %s: %w", filePath, err) + } + defer f.Close() + + // Set content length in context for storage + ctx = context.WithValue(ctx, ctxkey.ContentLength, fileInfo.Size()) + + // Save to storage + fileName := filepath.Base(filePath) + destPath := filepath.Join(t.StorPath, fileName) + + logger.Infof("Transferring file %s to %s:%s", fileName, t.Storage.Name(), destPath) + + if err := t.Storage.Save(ctx, f, destPath); err != nil { + return fmt.Errorf("failed to save file %s to storage: %w", fileName, err) + } + + logger.Infof("Successfully transferred file %s", fileName) + return nil +} + +// removeFileIfNeeded removes a file if RemoveAfterTransfer is enabled +func (t *Task) removeFileIfNeeded(filePath string) { + if config.C().Aria2.KeepFile { + return + } + + logger := log.FromContext(t.ctx) + if err := os.Remove(filePath); err != nil { + logger.Warnf("Failed to remove local file %s: %v", filePath, err) + } else { + logger.Debugf("Removed local file %s", filePath) + } +}