fix: update aria2 configuration to include KeepFile option and improve download handling
This commit is contained in:
@@ -36,10 +36,10 @@ type Config struct {
|
||||
}
|
||||
|
||||
type aria2Config struct {
|
||||
Enable bool `toml:"enable" mapstructure:"enable" json:"enable"`
|
||||
Url string `toml:"url" mapstructure:"url" json:"url"`
|
||||
Secret string `toml:"secret" mapstructure:"secret" json:"secret"`
|
||||
RemoveAfterTransfer bool `toml:"remove_after_transfer" mapstructure:"remove_after_transfer" json:"remove_after_transfer"`
|
||||
Enable bool `toml:"enable" mapstructure:"enable" json:"enable"`
|
||||
Url string `toml:"url" mapstructure:"url" json:"url"`
|
||||
Secret string `toml:"secret" mapstructure:"secret" json:"secret"`
|
||||
KeepFile bool `toml:"keep_file" mapstructure:"keep_file" json:"keep_file"`
|
||||
}
|
||||
|
||||
var cfg = &Config{}
|
||||
|
||||
@@ -23,58 +23,51 @@ func (t *Task) Execute(ctx context.Context) error {
|
||||
t.Progress.OnStart(ctx, t)
|
||||
}
|
||||
|
||||
// Wait for aria2 download to complete
|
||||
if err := t.waitForDownload(ctx); err != nil {
|
||||
logger.Errorf("Aria2 download failed: %v", err)
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Transfer downloaded files to storage
|
||||
if err := t.transferFiles(ctx); err != nil {
|
||||
logger.Errorf("File transfer failed: %v", err)
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Infof("Aria2 task %s completed successfully", t.ID)
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, nil)
|
||||
}
|
||||
|
||||
// Clean up aria2 download result
|
||||
if _, err := t.Aria2Client.RemoveDownloadResult(ctx, t.GID); err != nil {
|
||||
logger.Warnf("Failed to remove aria2 download result: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForDownload waits for aria2 to complete the download
|
||||
func (t *Task) waitForDownload(ctx context.Context) error {
|
||||
logger := log.FromContext(ctx)
|
||||
ticker := time.NewTicker(2 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
var status *aria2.Status
|
||||
var err error
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Warn("Aria2 task canceled")
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, ctx.Err())
|
||||
}
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
// Try to get status from active/waiting queue first
|
||||
status, err = t.Aria2Client.TellStatus(ctx, t.GID)
|
||||
status, err := t.getStatus(ctx)
|
||||
if err != nil {
|
||||
// If GID not found in active queue, check stopped queue
|
||||
logger.Debugf("Task not in active queue, checking stopped queue: %v", err)
|
||||
stoppedTasks, stopErr := t.Aria2Client.TellStopped(ctx, -1, 100)
|
||||
if stopErr != nil {
|
||||
logger.Errorf("Failed to get stopped tasks: %v", stopErr)
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, err)
|
||||
}
|
||||
return fmt.Errorf("failed to get aria2 status: %w", err)
|
||||
}
|
||||
|
||||
// Find our task in stopped queue
|
||||
found := false
|
||||
for _, task := range stoppedTasks {
|
||||
if task.GID == t.GID {
|
||||
status = &task
|
||||
found = true
|
||||
logger.Debugf("Found task in stopped queue with status: %s", status.Status)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
logger.Errorf("Task GID %s not found in active or stopped queue", t.GID)
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, err)
|
||||
}
|
||||
return fmt.Errorf("aria2 task not found: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if status.Status != "active" && status.Status != "waiting" {
|
||||
logger.Debugf("Aria2 GID %s status: %s, completed: %s/%s",
|
||||
t.GID, status.Status, status.CompletedLength, status.TotalLength)
|
||||
return err
|
||||
}
|
||||
|
||||
if t.Progress != nil {
|
||||
@@ -83,163 +76,149 @@ func (t *Task) Execute(ctx context.Context) error {
|
||||
|
||||
// Check if download is complete
|
||||
if status.IsDownloadComplete() {
|
||||
// Check if this is a metadata download (torrent/magnet) that spawned follow-up downloads
|
||||
// Handle metadata downloads (torrent/magnet) that spawn follow-up downloads
|
||||
if len(status.FollowedBy) > 0 {
|
||||
logger.Infof("Aria2 GID %s completed and spawned follow-up downloads: %v", t.GID, status.FollowedBy)
|
||||
logger.Infof("Switching to follow-up download GID: %s", status.FollowedBy[0])
|
||||
// Update to the follow-up GID (usually the actual file download)
|
||||
logger.Infof("Switching from metadata GID %s to actual download GID: %s", t.GID, status.FollowedBy[0])
|
||||
t.GID = status.FollowedBy[0]
|
||||
// Continue monitoring the new GID
|
||||
continue
|
||||
}
|
||||
logger.Infof("Aria2 download completed for GID %s", t.GID)
|
||||
goto TransferFiles
|
||||
logger.Infof("Download completed for GID %s", t.GID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check for errors
|
||||
if status.IsDownloadError() {
|
||||
err := fmt.Errorf("aria2 download error: %s (code: %s)", status.ErrorMessage, status.ErrorCode)
|
||||
logger.Errorf("Aria2 download failed: %v", err)
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, err)
|
||||
}
|
||||
return err
|
||||
return fmt.Errorf("aria2 download error: %s (code: %s)", status.ErrorMessage, status.ErrorCode)
|
||||
}
|
||||
|
||||
// Check if removed
|
||||
if status.IsDownloadRemoved() {
|
||||
err := errors.New("aria2 download was removed")
|
||||
logger.Error("Aria2 download was removed")
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, err)
|
||||
}
|
||||
return err
|
||||
return errors.New("aria2 download was removed")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TransferFiles:
|
||||
// Get final status to get file list
|
||||
status, err = t.Aria2Client.TellStatus(ctx, t.GID)
|
||||
if err != nil {
|
||||
logger.Errorf("Failed to get final status: %v", err)
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, err)
|
||||
// getStatus retrieves the current status of the download
|
||||
func (t *Task) getStatus(ctx context.Context) (*aria2.Status, error) {
|
||||
logger := log.FromContext(ctx)
|
||||
|
||||
// Try active/waiting queue first
|
||||
status, err := t.Aria2Client.TellStatus(ctx, t.GID)
|
||||
if err == nil {
|
||||
return status, nil
|
||||
}
|
||||
|
||||
// Check stopped queue
|
||||
logger.Debugf("Task not in active queue, checking stopped queue")
|
||||
stoppedTasks, stopErr := t.Aria2Client.TellStopped(ctx, -1, 100)
|
||||
if stopErr != nil {
|
||||
return nil, fmt.Errorf("failed to get aria2 status: %w", err)
|
||||
}
|
||||
|
||||
for _, task := range stoppedTasks {
|
||||
if task.GID == t.GID {
|
||||
logger.Debugf("Found task in stopped queue with status: %s", task.Status)
|
||||
return &task, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("task GID %s not found: %w", t.GID, err)
|
||||
}
|
||||
|
||||
// transferFiles transfers downloaded files from aria2 to storage
|
||||
func (t *Task) transferFiles(ctx context.Context) error {
|
||||
logger := log.FromContext(ctx)
|
||||
|
||||
status, err := t.Aria2Client.TellStatus(ctx, t.GID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get final status: %w", err)
|
||||
}
|
||||
|
||||
if len(status.Files) == 0 {
|
||||
err := errors.New("no files in aria2 download")
|
||||
logger.Error("No files in aria2 download")
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, err)
|
||||
}
|
||||
return err
|
||||
return errors.New("no files in aria2 download")
|
||||
}
|
||||
|
||||
// Transfer files to storage
|
||||
logger.Infof("Transferring %d file(s) to storage %s", len(status.Files), t.Storage.Name())
|
||||
transferredCount := 0
|
||||
|
||||
for _, file := range status.Files {
|
||||
if file.Selected != "true" {
|
||||
logger.Debugf("Skipping unselected file: %s", file.Path)
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip torrent files (they are metadata, not the actual content)
|
||||
fileName := filepath.Base(file.Path)
|
||||
|
||||
// Skip torrent metadata files
|
||||
if filepath.Ext(fileName) == ".torrent" {
|
||||
logger.Debugf("Skipping torrent metadata file: %s", file.Path)
|
||||
// Still remove it if configured
|
||||
if config.C().Aria2.RemoveAfterTransfer {
|
||||
if err := os.Remove(file.Path); err != nil {
|
||||
logger.Warnf("Failed to remove torrent file %s: %v", file.Path, err)
|
||||
} else {
|
||||
logger.Debugf("Removed torrent file %s", file.Path)
|
||||
}
|
||||
}
|
||||
logger.Debugf("Skipping torrent metadata file: %s", fileName)
|
||||
t.removeFileIfNeeded(file.Path)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if file exists
|
||||
if _, err := os.Stat(file.Path); os.IsNotExist(err) {
|
||||
logger.Warnf("Downloaded file not found: %s", file.Path)
|
||||
continue
|
||||
if err := t.transferFile(ctx, file.Path); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Open file
|
||||
f, err := os.Open(file.Path)
|
||||
if err != nil {
|
||||
logger.Errorf("Failed to open file %s: %v", file.Path, err)
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, err)
|
||||
}
|
||||
return fmt.Errorf("failed to open file %s: %w", file.Path, err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Get file info
|
||||
fileInfo, err := f.Stat()
|
||||
if err != nil {
|
||||
logger.Errorf("Failed to stat file %s: %v", file.Path, err)
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, err)
|
||||
}
|
||||
return fmt.Errorf("failed to stat file %s: %w", file.Path, err)
|
||||
}
|
||||
|
||||
// Set content length in context for storage
|
||||
ctx = context.WithValue(ctx, ctxkey.ContentLength, fileInfo.Size())
|
||||
|
||||
// Determine destination path
|
||||
fileName = filepath.Base(file.Path)
|
||||
destPath := filepath.Join(t.StorPath, fileName)
|
||||
|
||||
logger.Infof("Transferring file %s to %s:%s", fileName, t.Storage.Name(), destPath)
|
||||
|
||||
// Save to storage
|
||||
err = t.Storage.Save(ctx, f, destPath)
|
||||
if err != nil {
|
||||
logger.Errorf("Failed to save file %s to storage: %v", fileName, err)
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, err)
|
||||
}
|
||||
return fmt.Errorf("failed to save file %s to storage: %w", fileName, err)
|
||||
}
|
||||
|
||||
logger.Infof("Successfully transferred file %s", fileName)
|
||||
|
||||
transferredCount++
|
||||
|
||||
// Optionally remove the local file after successful transfer
|
||||
if config.C().Aria2.RemoveAfterTransfer {
|
||||
if err := os.Remove(file.Path); err != nil {
|
||||
logger.Warnf("Failed to remove local file %s: %v", file.Path, err)
|
||||
} else {
|
||||
logger.Debugf("Removed local file %s", file.Path)
|
||||
}
|
||||
}
|
||||
t.removeFileIfNeeded(file.Path)
|
||||
}
|
||||
|
||||
if transferredCount == 0 {
|
||||
err := errors.New("no files were transferred")
|
||||
logger.Error("No files were transferred to storage")
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Infof("Aria2 task %s completed successfully, transferred %d file(s)", t.ID, transferredCount)
|
||||
if t.Progress != nil {
|
||||
t.Progress.OnDone(ctx, t, nil)
|
||||
}
|
||||
|
||||
// Clean up aria2 download result
|
||||
_, err = t.Aria2Client.RemoveDownloadResult(ctx, t.GID)
|
||||
if err != nil {
|
||||
logger.Warnf("Failed to remove aria2 download result: %v", err)
|
||||
return errors.New("no files were transferred")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// transferFile transfers a single file to storage
|
||||
func (t *Task) transferFile(ctx context.Context, filePath string) error {
|
||||
logger := log.FromContext(ctx)
|
||||
|
||||
// Check if file exists
|
||||
fileInfo, err := os.Stat(filePath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
logger.Warnf("Downloaded file not found: %s", filePath)
|
||||
return nil // Not a fatal error, continue with other files
|
||||
}
|
||||
return fmt.Errorf("failed to stat file %s: %w", filePath, err)
|
||||
}
|
||||
|
||||
// Open file
|
||||
f, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file %s: %w", filePath, err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Set content length in context for storage
|
||||
ctx = context.WithValue(ctx, ctxkey.ContentLength, fileInfo.Size())
|
||||
|
||||
// Save to storage
|
||||
fileName := filepath.Base(filePath)
|
||||
destPath := filepath.Join(t.StorPath, fileName)
|
||||
|
||||
logger.Infof("Transferring file %s to %s:%s", fileName, t.Storage.Name(), destPath)
|
||||
|
||||
if err := t.Storage.Save(ctx, f, destPath); err != nil {
|
||||
return fmt.Errorf("failed to save file %s to storage: %w", fileName, err)
|
||||
}
|
||||
|
||||
logger.Infof("Successfully transferred file %s", fileName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// removeFileIfNeeded removes a file if RemoveAfterTransfer is enabled
|
||||
func (t *Task) removeFileIfNeeded(filePath string) {
|
||||
if config.C().Aria2.KeepFile {
|
||||
return
|
||||
}
|
||||
|
||||
logger := log.FromContext(t.ctx)
|
||||
if err := os.Remove(filePath); err != nil {
|
||||
logger.Warnf("Failed to remove local file %s: %v", filePath, err)
|
||||
} else {
|
||||
logger.Debugf("Removed local file %s", filePath)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user